summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilip Kuryloski <pkuryloski@pivotal.io>2020-03-12 20:37:29 +0100
committerPhilip Kuryloski <pkuryloski@pivotal.io>2020-03-13 10:29:35 +0100
commite3e428f877f3d98416ec2d8332854adfb457d062 (patch)
tree4d40e5e52dd3969de26c2d2417055f748aa33558
parentdd8ff0c9bdcacb54004ea1ca61e64449df83dd28 (diff)
downloadrabbitmq-server-git-e3e428f877f3d98416ec2d8332854adfb457d062.tar.gz
Rebuild messages indicies using less memorybackfill_gatherer
When rebuilding message indicies, we would produce an in memory list with a tuple of Id, Size & Offset for each message in each file. This refactor avoids creation of that list in an effort to reduce memory usage during node startup.
-rw-r--r--src/rabbit_msg_store.erl54
-rw-r--r--test/gatherer_SUITE.erl2
2 files changed, 31 insertions, 25 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 19a437484c..2397cbd9a0 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -1689,20 +1689,22 @@ recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) ->
ok = file_handle_cache:delete(TmpHdl),
ok.
-scan_file_for_valid_messages(Dir, FileName) ->
+foldl_messages_in_message_file(Fun, Acc0, Dir, FileName) ->
case open_file(Dir, FileName, ?READ_MODE) of
{ok, Hdl} -> Valid = rabbit_msg_file:scan(
Hdl, filelib:file_size(
form_filename(Dir, FileName)),
- fun scan_fun/2, []),
+ Fun, Acc0),
ok = file_handle_cache:close(Hdl),
Valid;
- {error, enoent} -> {ok, [], 0};
+ {error, enoent} -> {ok, Acc0, 0};
{error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
end.
-scan_fun({MsgId, TotalSize, Offset, _Msg}, Acc) ->
- [{MsgId, TotalSize, Offset} | Acc].
+scan_file_for_valid_messages(Dir, FileName) ->
+ foldl_messages_in_message_file(fun({MsgId, TotalSize, Offset, _Msg}, Acc) ->
+ [{MsgId, TotalSize, Offset} | Acc]
+ end, [], Dir, FileName).
%% Takes the list in *ascending* order (i.e. eldest message
%% first). This is the opposite of what scan_file_for_valid_messages
@@ -1732,6 +1734,7 @@ build_index(true, _StartupFunState,
sum_file_size = SumFileSize + FileSize,
current_file = File }}
end, {0, State}, FileSummaryEts);
+
build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit},
State = #msstate { dir = Dir }) ->
ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
@@ -1780,43 +1783,46 @@ build_index(Gatherer, Left, [File|Files], State) ->
build_index_worker(Gatherer, State = #msstate { dir = Dir },
Left, File, Files) ->
- {ok, Messages, FileSize} =
- scan_file_for_valid_messages(Dir, filenum_to_name(File)),
- {ValidMessages, ValidTotalSize} =
- lists:foldl(
- fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
+ FileName = filenum_to_name(File),
+ rabbit_log:debug("Rebuilding message index from ~p~n",
+ [form_filename(Dir, FileName)]),
+ {ok, {LastValidMessage, ValidTotalSize}, FileSize} =
+ foldl_messages_in_message_file(
+ fun({MsgId, TotalSize, Offset, _Msg}, {_LastMsg, AccTotalSize} = Acc) ->
case index_lookup(MsgId, State) of
#msg_location { file = undefined } = StoreEntry ->
ok = index_update(StoreEntry #msg_location {
file = File, offset = Offset,
total_size = TotalSize },
State),
- {[Obj | VMAcc], VTSAcc + TotalSize};
+ Message = {MsgId, TotalSize, Offset},
+ {Message, AccTotalSize + TotalSize};
_ ->
- {VMAcc, VTSAcc}
+ Acc
end
- end, {[], 0}, Messages),
+ end,
+ {undefined, 0},
+ Dir, FileName),
{Right, FileSize1} =
case Files of
%% if it's the last file, we'll truncate to remove any
%% rubbish above the last valid message. This affects the
%% file size.
- [] -> {undefined, case ValidMessages of
- [] -> 0;
- _ -> {_MsgId, TotalSize, Offset} =
- lists:last(ValidMessages),
+ [] -> {undefined, case LastValidMessage of
+ undefined -> 0;
+ _ -> {_MsgId, TotalSize, Offset} = LastValidMessage,
Offset + TotalSize
end};
[F|_] -> {F, FileSize}
end,
ok = gatherer:in(Gatherer, #file_summary {
- file = File,
- valid_total_size = ValidTotalSize,
- left = Left,
- right = Right,
- file_size = FileSize1,
- locked = false,
- readers = 0 }),
+ file = File,
+ valid_total_size = ValidTotalSize,
+ left = Left,
+ right = Right,
+ file_size = FileSize1,
+ locked = false,
+ readers = 0 }),
ok = gatherer:finish(Gatherer).
%%----------------------------------------------------------------------------
diff --git a/test/gatherer_SUITE.erl b/test/gatherer_SUITE.erl
index ba57455438..629c05dc0a 100644
--- a/test/gatherer_SUITE.erl
+++ b/test/gatherer_SUITE.erl
@@ -10,7 +10,7 @@
%%
%% The Original Code is RabbitMQ.
%%
-%% Copyright (c) 2020 Pivotal Software, Inc. All rights reserved.
+%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(gatherer_SUITE).