summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-07-17 13:03:17 +0100
committerMatthew Sackman <matthew@lshift.net>2009-07-17 13:03:17 +0100
commit040a7b7c421a9bb3b5f4ab50eb06dda0b12d42da (patch)
treef297825a1f66d3c6c885dfb0e846c9ae1705d5ac
parent563b889cabb7a5877fe2b0f18628a56b035976fe (diff)
downloadrabbitmq-server-040a7b7c421a9bb3b5f4ab50eb06dda0b12d42da.tar.gz
The use of the in-memory run length queue in disk_only queue is considered a show stopper, and rightly so. I personally don't like the idea of adding additional tokens to the disk queue to indicated queue switch because it can substantially increase the number of OS calls and writes and reads from disk and, eg, getting queue length right and memory size right is made a fair bit more complex. So abandon the two queues idea.
Instead, store the persistent flag in the stop byte on disk. Then on startup, the persistent flag turns up in the MsgLocations ets table. This is all done and all tests pass. The next stage is that on start up, go through each queue and just wipe out non-persistent messages. This should be pretty fast. Then call the shuffle_up function as is currently being done. This will eliminate the gaps in sequences. This really should be enough. Then the mixed_queue can go back to just talking about a single queue.
-rw-r--r--src/rabbit_disk_queue.erl152
-rw-r--r--src/rabbit_misc.erl8
-rw-r--r--src/rabbit_mixed_queue.erl5
3 files changed, 98 insertions, 67 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 868eab4a..4eef884f 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -53,7 +53,8 @@
-include("rabbit.hrl").
-define(WRITE_OK_SIZE_BITS, 8).
--define(WRITE_OK, 255).
+-define(WRITE_OK_TRANSIENT, 255).
+-define(WRITE_OK_PERSISTENT, 254).
-define(INTEGER_SIZE_BYTES, 8).
-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)).
-define(MSG_LOC_NAME, rabbit_disk_queue_msg_location).
@@ -101,8 +102,8 @@
%% The components:
%%
-%% MsgLocation: this is a dets table which contains:
-%% {MsgId, RefCount, File, Offset, TotalSize}
+%% MsgLocation: this is a (d)ets table which contains:
+%% {MsgId, RefCount, File, Offset, TotalSize, IsPersistent}
%% FileSummary: this is an ets table which contains:
%% {File, ValidTotalSize, ContiguousTop, Left, Right}
%% Sequences: this is an ets table which contains:
@@ -393,7 +394,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
?FILE_EXTENSION_DETS)},
{min_no_slots, 1024*1024},
%% man says this should be <= 32M. But it works...
- {max_no_slots, 1024*1024*1024},
+ {max_no_slots, 30*1024*1024},
{type, set}
]),
@@ -509,8 +510,8 @@ handle_cast({ack, Q, MsgSeqIds}, State) ->
handle_cast({auto_ack_next_message, Q}, State) ->
{ok, State1} = internal_auto_ack(Q, State),
noreply(State1);
-handle_cast({tx_publish, Message = #basic_message { guid = MsgId }}, State) ->
- {ok, State1} = internal_tx_publish(MsgId, Message, State),
+handle_cast({tx_publish, Message}, State) ->
+ {ok, State1} = internal_tx_publish(Message, State),
noreply(State1);
handle_cast({tx_cancel, MsgIds}, State) ->
{ok, State1} = internal_tx_cancel(MsgIds, State),
@@ -636,8 +637,8 @@ memory_use(#dqstate { operation_mode = disk_only,
(WordSize * (ets:info(FileSummary, memory) +
ets:info(Cache, memory) +
ets:info(Sequences, memory))) +
- round(MnesiaSizeEstimate) +
- round(MsgLocationSizeEstimate).
+ rabbit_misc:ceil(MnesiaSizeEstimate) +
+ rabbit_misc:ceil(MsgLocationSizeEstimate).
to_disk_only_mode(State = #dqstate { operation_mode = disk_only }) ->
State;
@@ -872,7 +873,8 @@ insert_into_cache(Message = #basic_message { guid = MsgId },
true -> 0;
false -> 1
end,
- true = ets:insert_new(Cache, {MsgId, Message, MsgSize, Count}),
+ true =
+ ets:insert_new(Cache, {MsgId, Message, MsgSize, Count}),
ok.
%% ---- INTERNAL RAW FUNCTIONS ----
@@ -890,8 +892,9 @@ internal_deliver(Q, ReadMsg, FakeDeliver,
{Q, ReadSeqId+1, WriteSeqId}),
{ok,
case Result of
- {MsgId, Delivered, {MsgId, ReadSeqId}} ->
- {MsgId, Delivered, {MsgId, ReadSeqId}, Remaining};
+ {MsgId, IsPersistent, Delivered, {MsgId, ReadSeqId}} ->
+ {MsgId, IsPersistent, Delivered, {MsgId, ReadSeqId},
+ Remaining};
{Message, BodySize, Delivered, {MsgId, ReadSeqId}} ->
{Message, BodySize, Delivered, {MsgId, ReadSeqId},
Remaining}
@@ -927,7 +930,7 @@ internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) -
[Obj =
#dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] =
mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}),
- [{MsgId, RefCount, File, Offset, TotalSize}] =
+ [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] =
dets_ets_lookup(State, MsgId),
ok =
if FakeDeliver orelse Delivered -> ok;
@@ -940,12 +943,13 @@ internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) -
case fetch_and_increment_cache(MsgId, State) of
not_found ->
{FileHdl, State1} = get_read_handle(File, Offset, State),
- {ok, {MsgBody, BodySize}} =
+ {ok, {MsgBody, IsPersistent, BodySize}} =
read_message_at_offset(FileHdl, Offset, TotalSize),
- Message = bin_to_msg(MsgBody),
+ #basic_message { is_persistent=IsPersistent, guid=MsgId } =
+ Message = bin_to_msg(MsgBody),
ok = if RefCount > 1 orelse ForceInCache ->
- insert_into_cache(Message, BodySize,
- ForceInCache, State1);
+ insert_into_cache
+ (Message, BodySize, ForceInCache, State1);
true -> ok
%% it's not in the cache and we only
%% have 1 queue with the message. So
@@ -959,13 +963,14 @@ internal_read_message(Q, ReadSeqId, ReadMsg, FakeDeliver, ForceInCache, State) -
State}
end;
false ->
- {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, State}
+ {ok, {MsgId, IsPersistent, Delivered, {MsgId, ReadSeqId}}, State}
end.
internal_auto_ack(Q, State) ->
case internal_deliver(Q, false, true, State) of
{ok, empty, State1} -> {ok, State1};
- {ok, {_MsgId, _Delivered, MsgSeqId, _Remaining}, State1} ->
+ {ok, {_MsgId, _IsPersistent, _Delivered, MsgSeqId, _Remaining},
+ State1} ->
remove_messages(Q, [MsgSeqId], true, State1)
end.
@@ -985,7 +990,7 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete,
Files =
lists:foldl(
fun ({MsgId, SeqId}, Files1) ->
- [{MsgId, RefCount, File, Offset, TotalSize}] =
+ [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] =
dets_ets_lookup(State, MsgId),
Files2 =
case RefCount of
@@ -1007,8 +1012,8 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete,
_ when 1 < RefCount ->
ok = decrement_cache(MsgId, State),
ok = dets_ets_insert(
- State, {MsgId, RefCount - 1,
- File, Offset, TotalSize}),
+ State, {MsgId, RefCount - 1, File, Offset,
+ TotalSize, IsPersistent}),
Files1
end,
ok = case MnesiaDelete of
@@ -1023,7 +1028,8 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete,
State1 = compact(Files, State),
{ok, State1}.
-internal_tx_publish(MsgId, Message,
+internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent,
+ guid = MsgId },
State = #dqstate { current_file_handle = CurHdl,
current_file_name = CurName,
current_offset = CurOffset,
@@ -1032,10 +1038,11 @@ internal_tx_publish(MsgId, Message,
case dets_ets_lookup(State, MsgId) of
[] ->
%% New message, lots to do
- {ok, TotalSize} =
- append_message(CurHdl, MsgId, msg_to_bin(Message)),
- true = dets_ets_insert_new(State, {MsgId, 1, CurName,
- CurOffset, TotalSize}),
+ {ok, TotalSize} = append_message(CurHdl, MsgId, msg_to_bin(Message),
+ IsPersistent),
+ true = dets_ets_insert_new
+ (State, {MsgId, 1, CurName,
+ CurOffset, TotalSize, IsPersistent}),
[{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] =
ets:lookup(FileSummary, CurName),
ValidTotalSize1 = ValidTotalSize + TotalSize +
@@ -1051,10 +1058,10 @@ internal_tx_publish(MsgId, Message,
maybe_roll_to_new_file(
NextOffset, State #dqstate {current_offset = NextOffset,
current_dirty = true});
- [{MsgId, RefCount, File, Offset, TotalSize}] ->
+ [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] ->
%% We already know about it, just update counter
ok = dets_ets_insert(State, {MsgId, RefCount + 1, File,
- Offset, TotalSize}),
+ Offset, TotalSize, IsPersistent}),
{ok, State}
end.
@@ -1080,7 +1087,8 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
lists:foldl(
fun (MsgId, {InCurFileAcc, SeqId}) ->
[{MsgId, _RefCount, File, Offset,
- _TotalSize}] = dets_ets_lookup(State, MsgId),
+ _TotalSize, _IsPersistent}] =
+ dets_ets_lookup(State, MsgId),
ok = mnesia:write(
rabbit_disk_queue,
#dq_msg_loc { queue_and_seq_id =
@@ -1109,7 +1117,7 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
internal_publish(Q, Message = #basic_message { guid = MsgId },
IsDelivered, State) ->
{ok, State1 = #dqstate { sequences = Sequences }} =
- internal_tx_publish(MsgId, Message, State),
+ internal_tx_publish(Message, State),
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
ok = mnesia:dirty_write(rabbit_disk_queue,
#dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId},
@@ -1363,7 +1371,7 @@ sort_msg_locations_by_offset(Asc, List) ->
true -> fun erlang:'<'/2;
false -> fun erlang:'>'/2
end,
- lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) ->
+ lists:sort(fun ({_, _, _, OffA, _, _}, {_, _, _, OffB, _, _}) ->
Comp(OffA, OffB)
end, List).
@@ -1402,7 +1410,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
read_ahead, delayed_write]),
Worklist =
lists:dropwhile(
- fun ({_, _, _, Offset, _})
+ fun ({_, _, _, Offset, _, _})
when Offset /= DestinationContiguousTop ->
%% it cannot be that Offset ==
%% DestinationContiguousTop because if it
@@ -1416,7 +1424,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
end, sort_msg_locations_by_offset(
true, dets_ets_match_object(State,
{'_', '_', Destination,
- '_', '_'}))),
+ '_', '_', '_'}))),
ok = copy_messages(
Worklist, DestinationContiguousTop, DestinationValid,
DestinationHdl, TmpHdl, Destination, State),
@@ -1438,7 +1446,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
sort_msg_locations_by_offset(
true, dets_ets_match_object(State,
{'_', '_', Source,
- '_', '_'})),
+ '_', '_', '_'})),
ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
SourceHdl, DestinationHdl, Destination, State),
%% tidy up
@@ -1452,14 +1460,15 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
Destination, State) ->
{FinalOffset, BlockStart1, BlockEnd1} =
lists:foldl(
- fun ({MsgId, RefCount, _Source, Offset, TotalSize},
+ fun ({MsgId, RefCount, _Source, Offset, TotalSize, IsPersistent},
{CurOffset, BlockStart, BlockEnd}) ->
%% CurOffset is in the DestinationFile.
%% Offset, BlockStart and BlockEnd are in the SourceFile
Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
%% update MsgLocationDets to reflect change of file and offset
- ok = dets_ets_insert(State, {MsgId, RefCount, Destination,
- CurOffset, TotalSize}),
+ ok = dets_ets_insert
+ (State, {MsgId, RefCount, Destination,
+ CurOffset, TotalSize, IsPersistent}),
NextOffset = CurOffset + Size,
if BlockStart =:= undefined ->
%% base case, called only for the first list elem
@@ -1643,11 +1652,13 @@ load_messages(undefined, [],
State;
load_messages(Left, [], State) ->
Num = list_to_integer(filename:rootname(Left)),
- Offset = case dets_ets_match_object(State, {'_', '_', Left, '_', '_'}) of
- [] -> 0;
- L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_] =
- sort_msg_locations_by_offset(false, L),
- MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT
+ Offset =
+ case dets_ets_match_object(State, {'_', '_', Left, '_', '_', '_'}) of
+ [] -> 0;
+ L ->
+ [ {_MsgId, _RefCount, Left, MaxOffset, TotalSize, _IsPersistent}
+ | _ ] = sort_msg_locations_by_offset(false, L),
+ MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT
end,
State #dqstate { current_file_num = Num, current_file_name = Left,
current_offset = Offset };
@@ -1656,7 +1667,7 @@ load_messages(Left, [File|Files],
%% [{MsgId, TotalSize, FileOffset}]
{ok, Messages} = scan_file_for_valid_messages(form_filename(File)),
{ValidMessagesRev, ValidTotalSize} = lists:foldl(
- fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
+ fun ({MsgId, IsPersistent, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
case erlang:length(mnesia:dirty_index_match_object
(rabbit_disk_queue,
#dq_msg_loc { msg_id = MsgId,
@@ -1666,9 +1677,9 @@ load_messages(Left, [File|Files],
msg_id)) of
0 -> {VMAcc, VTSAcc};
RefCount ->
- true =
- dets_ets_insert_new(State, {MsgId, RefCount, File,
- Offset, TotalSize}),
+ true = dets_ets_insert_new
+ (State, {MsgId, RefCount, File,
+ Offset, TotalSize, IsPersistent}),
{[{MsgId, TotalSize, Offset}|VMAcc],
VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT
}
@@ -1706,20 +1717,22 @@ verify_messages_in_mnesia(MsgIds) ->
msg_id))
end, MsgIds).
+grab_msg_id({MsgId, _IsPersistent, _TotalSize, _FileOffset}) ->
+ MsgId.
+
recover_crashed_compactions1(Files, TmpFile) ->
- GrabMsgId = fun ({MsgId, _TotalSize, _FileOffset}) -> MsgId end,
NonTmpRelatedFile = filename:rootname(TmpFile) ++ ?FILE_EXTENSION,
true = lists:member(NonTmpRelatedFile, Files),
%% [{MsgId, TotalSize, FileOffset}]
{ok, UncorruptedMessagesTmp} =
scan_file_for_valid_messages(form_filename(TmpFile)),
- MsgIdsTmp = lists:map(GrabMsgId, UncorruptedMessagesTmp),
+ MsgIdsTmp = lists:map(fun grab_msg_id/1, UncorruptedMessagesTmp),
%% all of these messages should appear in the mnesia table,
%% otherwise they wouldn't have been copied out
verify_messages_in_mnesia(MsgIdsTmp),
{ok, UncorruptedMessages} =
scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
- MsgIds = lists:map(GrabMsgId, UncorruptedMessages),
+ MsgIds = lists:map(fun grab_msg_id/1, UncorruptedMessages),
%% 1) It's possible that everything in the tmp file is also in the
%% main file such that the main file is (prefix ++
%% tmpfile). This means that compaction failed immediately
@@ -1788,7 +1801,7 @@ recover_crashed_compactions1(Files, TmpFile) ->
{ok, MainMessages} =
scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
- MsgIdsMain = lists:map(GrabMsgId, MainMessages),
+ MsgIdsMain = lists:map(fun grab_msg_id/1, MainMessages),
%% check that everything in MsgIds is in MsgIdsMain
true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end,
MsgIds),
@@ -1833,16 +1846,20 @@ get_disk_queue_files() ->
%% ---- RAW READING AND WRITING OF FILES ----
-append_message(FileHdl, MsgId, MsgBody) when is_binary(MsgBody) ->
+append_message(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) ->
BodySize = size(MsgBody),
MsgIdBin = term_to_binary(MsgId),
MsgIdBinSize = size(MsgIdBin),
TotalSize = BodySize + MsgIdBinSize,
+ StopByte = case IsPersistent of
+ true -> ?WRITE_OK_PERSISTENT;
+ false -> ?WRITE_OK_TRANSIENT
+ end,
case file:write(FileHdl, <<TotalSize:?INTEGER_SIZE_BITS,
MsgIdBinSize:?INTEGER_SIZE_BITS,
MsgIdBin:MsgIdBinSize/binary,
MsgBody:BodySize/binary,
- ?WRITE_OK:?WRITE_OK_SIZE_BITS>>) of
+ StopByte:?WRITE_OK_SIZE_BITS>>) of
ok -> {ok, TotalSize};
KO -> KO
end.
@@ -1856,9 +1873,14 @@ read_message_at_offset(FileHdl, Offset, TotalSize) ->
MsgIdBinSize:?INTEGER_SIZE_BITS,
Rest:TotalSizeWriteOkBytes/binary>>} ->
BodySize = TotalSize - MsgIdBinSize,
- <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary,
- ?WRITE_OK:?WRITE_OK_SIZE_BITS>> = Rest,
- {ok, {MsgBody, BodySize}};
+ case Rest of
+ <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary,
+ ?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>> ->
+ {ok, {MsgBody, false, BodySize}};
+ <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary,
+ ?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>> ->
+ {ok, {MsgBody, true, BodySize}}
+ end;
KO -> KO
end;
KO -> KO
@@ -1876,15 +1898,15 @@ scan_file_for_valid_messages(FileHdl, Offset, Acc) ->
{ok, eof} -> {ok, Acc};
{ok, {corrupted, NextOffset}} ->
scan_file_for_valid_messages(FileHdl, NextOffset, Acc);
- {ok, {ok, MsgId, TotalSize, NextOffset}} ->
- scan_file_for_valid_messages(FileHdl, NextOffset,
- [{MsgId, TotalSize, Offset}|Acc]);
+ {ok, {ok, MsgId, IsPersistent, TotalSize, NextOffset}} ->
+ scan_file_for_valid_messages(
+ FileHdl, NextOffset,
+ [{MsgId, IsPersistent, TotalSize, Offset} | Acc]);
_KO ->
%% bad message, but we may still have recovered some valid messages
{ok, Acc}
end.
-
read_next_file_entry(FileHdl, Offset) ->
TwoIntegers = 2 * ?INTEGER_SIZE_BYTES,
case file:read(FileHdl, TwoIntegers) of
@@ -1915,9 +1937,15 @@ read_next_file_entry(FileHdl, Offset) ->
?FILE_PACKING_ADJUSTMENT,
case file:read(FileHdl, 1) of
{ok,
- <<?WRITE_OK:?WRITE_OK_SIZE_BITS>>} ->
- {ok, {ok, binary_to_term(MsgId),
- TotalSize, NextOffset}};
+ <<?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>>} ->
+ {ok,
+ {ok, binary_to_term(MsgId),
+ false, TotalSize, NextOffset}};
+ {ok,
+ <<?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>>} ->
+ {ok,
+ {ok, binary_to_term(MsgId),
+ true, TotalSize, NextOffset}};
{ok, _SomeOtherData} ->
{ok, {corrupted, NextOffset}};
KO -> KO
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 176ddddb..fc30834e 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -446,10 +446,12 @@ unfold(Fun, Acc, Init) ->
false -> {Acc, Init}
end.
-ceil(N) when N - trunc(N) > 0 ->
- 1 + trunc(N);
ceil(N) ->
- N.
+ T = trunc(N),
+ case N - T of
+ 0 -> N;
+ _ -> 1 + T
+ end.
keygets(Keys, KeyList) ->
lists:reverse(
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index d864d9b2..425d7763 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -311,7 +311,8 @@ publish_delivered(Msg =
%% must call phantom_deliver otherwise the msg remains at
%% the head of the queue. This is synchronous, but
%% unavoidable as we need the AckTag
- {MsgId, false, AckTag, 0} = rabbit_disk_queue:phantom_deliver(PubQ),
+ {MsgId, IsPersistent, false, AckTag, 0} =
+ rabbit_disk_queue:phantom_deliver(PubQ),
{ok, AckTag, State1};
false ->
%% in this case, we don't actually care about the ack, so
@@ -340,7 +341,7 @@ deliver(State = #mqstate { msg_buf = MsgBuf, queue = Q,
AckTag1 =
case IsDurable andalso IsPersistent of
true ->
- {MsgId, IsDelivered1, AckTag2, _PersistRem}
+ {MsgId, IsPersistent, IsDelivered1, AckTag2, _PRem}
= rabbit_disk_queue:phantom_deliver(Q),
AckTag2;
false ->