summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-12-20 03:43:21 +0000
committerMatthew Sackman <matthew@lshift.net>2009-12-20 03:43:21 +0000
commit5307e4c6a7f21458fda7fb97020dd838005046e2 (patch)
tree7989c1dcdd689a11903ea8475cf74776039c486d
parent6d3e84b4d013b0dc306927324de34b0d41133c84 (diff)
downloadrabbitmq-server-5307e4c6a7f21458fda7fb97020dd838005046e2.tar.gz
background lazy GC in and working
-rw-r--r--include/rabbit_msg_store.hrl51
-rw-r--r--src/rabbit_msg_store.erl523
-rw-r--r--src/rabbit_msg_store_ets_index.erl71
-rw-r--r--src/rabbit_msg_store_gc.erl249
-rw-r--r--src/rabbit_msg_store_misc.erl74
5 files changed, 611 insertions, 357 deletions
diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl
new file mode 100644
index 00000000..925d5d8e
--- /dev/null
+++ b/include/rabbit_msg_store.hrl
@@ -0,0 +1,51 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-record(msg_location,
+ {msg_id, ref_count, file, offset, total_size}).
+
+-record(file_summary,
+ {file, valid_total_size, contiguous_top, left, right, file_size,
+ locked}).
+
+-define(BINARY_MODE, [raw, binary]).
+-define(READ_MODE, [read]).
+-define(READ_AHEAD_MODE, [read_ahead | ?READ_MODE]).
+-define(WRITE_MODE, [write]).
+
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+-define(FILE_EXTENSION, ".rdq").
+-define(FILE_EXTENSION_TMP, ".rdt").
+
+-define(FILE_SIZE_LIMIT, (16*1024*1024)).
+
+-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index c8d27ba6..f40c6270 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -36,16 +36,16 @@
-export([start_link/3, write/2, read/1, contains/1, remove/1, release/1,
sync/2]).
--export([sync/0]). %% internal
+-export([sync/0, gc_done/3]). %% internal
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, handle_pre_hibernate/1]).
-define(SERVER, ?MODULE).
--define(FILE_SIZE_LIMIT, (16*1024*1024)).
-define(SYNC_INTERVAL, 5). %% milliseconds
--define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB
+
+-define(GEOMETRIC_P, 0.3). %% parameter to geometric distribution rng
%%----------------------------------------------------------------------------
@@ -54,6 +54,7 @@
-type(msg_id() :: binary()).
-type(msg() :: any()).
-type(file_path() :: any()).
+-type(file_num() :: non_neg_integer()).
-spec(start_link/3 ::
(file_path(),
@@ -65,6 +66,7 @@
-spec(remove/1 :: ([msg_id()]) -> 'ok').
-spec(release/1 :: ([msg_id()]) -> 'ok').
-spec(sync/2 :: ([msg_id()], fun (() -> any())) -> 'ok').
+-spec(gc_done/3 :: (non_neg_integer(), file_num(), file_num()) -> 'ok').
-endif.
@@ -72,12 +74,12 @@
-record(msstate,
{dir, %% store directory
- msg_locations, %% where are messages?
+ index_module, %% the module for index ops
+ index_state, %% where are messages?
file_summary, %% what's in the files?
current_file, %% current file name as number
current_file_handle, %% current file handle
%% since the last fsync?
- file_size_limit, %% how big can our files get?
file_handle_cache, %% file handle cache
on_sync, %% pending sync requests
sync_timer_ref, %% TRef for our interval timer
@@ -85,36 +87,17 @@
sum_valid_data, %% sum of valid data in all files
sum_file_size, %% sum of file sizes
pending_gc_completion, %% things to do once GC completes
- gc_pid %% pid of the GC process
+ gc_running %% is the GC currently working?
}).
--record(msg_location,
- {msg_id, ref_count, file, offset, total_size}).
-
--record(file_summary,
- {file, valid_total_size, contiguous_top, left, right, file_size,
- locked}).
-
--record(gcstate,
- {dir
- }).
+-include("rabbit_msg_store.hrl").
--define(MSG_LOC_NAME, rabbit_disk_queue_msg_location).
--define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
--define(FILE_EXTENSION, ".rdq").
--define(FILE_EXTENSION_TMP, ".rdt").
--define(CACHE_ETS_NAME, rabbit_disk_queue_cache).
+-define(FILE_SUMMARY_ETS_NAME, rabbit_msg_store_file_summary).
+-define(CACHE_ETS_NAME, rabbit_msg_store_cache).
%% We run GC whenever (garbage / sum_file_size) > ?GARBAGE_FRACTION
+%% It is not recommended to set this to < 0.5
-define(GARBAGE_FRACTION, 0.5).
--define(BINARY_MODE, [raw, binary]).
--define(READ_MODE, [read]).
--define(READ_AHEAD_MODE, [read_ahead | ?READ_MODE]).
--define(WRITE_MODE, [write]).
-
--define(HIBERNATE_AFTER_MIN, 1000).
--define(DESIRED_HIBERNATE, 10000).
-
%% The components:
%%
%% MsgLocation: this is an ets table which contains:
@@ -249,6 +232,8 @@ remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}).
release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}).
sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}).
sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal
+gc_done(Reclaimed, Source, Destination) ->
+ gen_server2:pcast(?SERVER, 9, {gc_done, Reclaimed, Source, Destination}).
%%----------------------------------------------------------------------------
%% gen_server callbacks
@@ -259,21 +244,21 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
- MsgLocations = ets:new(?MSG_LOC_NAME,
- [set, protected, {keypos, #msg_location.msg_id}]),
+ IndexModule = rabbit_msg_store_ets_index,
+ IndexState = IndexModule:init(),
InitFile = 0,
FileSummary = ets:new(?FILE_SUMMARY_ETS_NAME,
- [ordered_set, protected,
+ [ordered_set, public,
{keypos, #file_summary.file}]),
MessageCache = ets:new(?CACHE_ETS_NAME, [set, private]),
State =
#msstate { dir = Dir,
- msg_locations = MsgLocations,
+ index_module = IndexModule,
+ index_state = IndexState,
file_summary = FileSummary,
current_file = InitFile,
current_file_handle = undefined,
- file_size_limit = ?FILE_SIZE_LIMIT,
file_handle_cache = dict:new(),
on_sync = [],
sync_timer_ref = undefined,
@@ -281,7 +266,7 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
sum_valid_data = 0,
sum_file_size = 0,
pending_gc_completion = [],
- gc_pid = undefined
+ gc_running = false
},
ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
@@ -297,11 +282,15 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
build_index(Files, State),
%% read is only needed so that we can seek
- {ok, FileHdl} = open_file(Dir, filenum_to_name(CurFile),
- [read | ?WRITE_MODE]),
+ {ok, FileHdl} = rabbit_msg_store_misc:open_file(
+ Dir, rabbit_msg_store_misc:filenum_to_name(CurFile),
+ [read | ?WRITE_MODE]),
{ok, Offset} = file_handle_cache:position(FileHdl, Offset),
ok = file_handle_cache:truncate(FileHdl),
+ {ok, _Pid} = rabbit_msg_store_gc:start_link(
+ Dir, IndexState, FileSummary, IndexModule),
+
{ok, State1 #msstate { current_file_handle = FileHdl }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -355,10 +344,12 @@ handle_cast({write, MsgId, Msg},
{ sum_valid_data = SumValid + TotalSize,
sum_file_size = SumFileSize + TotalSize }
)));
- StoreEntry = #msg_location { ref_count = RefCount } ->
- %% We already know about it, just update counter
- ok = index_update(StoreEntry #msg_location {
- ref_count = RefCount + 1 }, State),
+ #msg_location { ref_count = RefCount } ->
+ %% We already know about it, just update counter. Only
+ %% update field otherwise bad interaction with concurrent GC
+ ok = index_update_fields(MsgId,
+ {#msg_location.ref_count, RefCount + 1},
+ State),
noreply(State)
end;
@@ -388,15 +379,27 @@ handle_cast({sync, MsgIds, K},
end;
handle_cast(sync, State) ->
- noreply(sync(State)).
+ noreply(sync(State));
-%% handle_cast({gc_finished, GCPid, RemainingFile, DeletedFile, MsgLocations},
-%% State = #msstate { file_summary = FileSummary,
-%% gc_pid = GCPid }) ->
-%% true = ets:delete(FileSummary, DeletedFile),
-%% true = ets:insert(FileSummary, RemainingFile),
-%% State1 = lists:foldl(fun index_insert/2, State, MsgLocations),
-%% noreply(maybe_compact(run_pending(State1))).
+handle_cast({gc_done, Reclaimed, Source, Dest},
+ State = #msstate { sum_file_size = SumFileSize,
+ gc_running = true,
+ file_summary = FileSummary }) ->
+ %% we always move data left, so Source has gone and was on the
+ %% right, so need to make dest = source.right.left, and also
+ %% dest.right = source.right
+ [#file_summary { left = Dest, right = SourceRight, locked = true }] =
+ ets:lookup(FileSummary, Source),
+ %% this could fail if SourceRight == undefined
+ ets:update_element(FileSummary, SourceRight,
+ {#file_summary.left, Dest}),
+ true = ets:update_element(FileSummary, Dest,
+ [{#file_summary.locked, false},
+ {#file_summary.right, SourceRight}]),
+ true = ets:delete(FileSummary, Source),
+ noreply(run_pending(
+ State #msstate { sum_file_size = SumFileSize - Reclaimed,
+ gc_running = false })).
handle_info(timeout, State) ->
noreply(sync(State));
@@ -408,9 +411,13 @@ handle_info({file_handle_cache, maximum_eldest_since_use, Age}, State) ->
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
-terminate(_Reason, State = #msstate { msg_locations = MsgLocations,
+terminate(_Reason, State = #msstate { index_state = IndexState,
+ index_module = IndexModule,
file_summary = FileSummary,
current_file_handle = FileHdl }) ->
+ %% stop the gc first, otherwise it could be working and we pull
+ %% out the ets tables from under it.
+ ok = rabbit_msg_store_gc:stop(),
State1 = case FileHdl of
undefined -> State;
_ -> State2 = sync(State),
@@ -418,9 +425,9 @@ terminate(_Reason, State = #msstate { msg_locations = MsgLocations,
State2
end,
State3 = close_all_handles(State1),
- ets:delete(MsgLocations),
ets:delete(FileSummary),
- State3 #msstate { msg_locations = undefined,
+ IndexModule:terminate(IndexState),
+ State3 #msstate { index_state = undefined,
file_summary = undefined,
current_file_handle = undefined }.
@@ -428,11 +435,7 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
handle_pre_hibernate(State) ->
- {Result, State1} = maybe_compact1(State),
- {case Result of
- true -> insomniate;
- false -> hibernate
- end, State1}.
+ {hibernate, maybe_compact(State)}.
%%----------------------------------------------------------------------------
%% general helper functions
@@ -465,27 +468,12 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
{ok, cancel} = timer:cancel(TRef),
State #msstate { sync_timer_ref = undefined }.
-form_filename(Dir, Name) -> filename:join(Dir, Name).
-
-filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
-
filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)).
sort_file_names(FileNames) ->
lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end,
FileNames).
-preallocate(Hdl, FileSizeLimit, FinalPos) ->
- {ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit),
- ok = file_handle_cache:truncate(Hdl),
- {ok, FinalPos} = file_handle_cache:position(Hdl, FinalPos),
- ok.
-
-truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) ->
- {ok, Lowpoint} = file_handle_cache:position(FileHdl, Lowpoint),
- ok = file_handle_cache:truncate(FileHdl),
- ok = preallocate(FileHdl, Highpoint, Lowpoint).
-
sync(State = #msstate { current_file_handle = CurHdl,
on_sync = Syncs }) ->
State1 = stop_sync_timer(State),
@@ -556,8 +544,8 @@ read_message(MsgId, State =
remove_message(MsgId, State = #msstate { file_summary = FileSummary,
sum_valid_data = SumValid }) ->
- StoreEntry = #msg_location { ref_count = RefCount, file = File,
- offset = Offset, total_size = TotalSize } =
+ #msg_location { ref_count = RefCount, file = File,
+ offset = Offset, total_size = TotalSize } =
index_lookup(MsgId, State),
case RefCount of
1 ->
@@ -582,18 +570,22 @@ remove_message(MsgId, State = #msstate { file_summary = FileSummary,
end;
_ when 1 < RefCount ->
ok = decrement_cache(MsgId, State),
- ok = index_update(StoreEntry #msg_location
- { ref_count = RefCount - 1 }, State),
+ %% only update field, otherwise bad interaction with concurrent GC
+ ok = index_update_fields(MsgId,
+ {#msg_location.ref_count, RefCount - 1},
+ State),
State
end.
add_to_pending_gc_completion(
Op, State = #msstate { pending_gc_completion = Pending }) ->
- State #msstate { pending_gc_completion = [Op, Pending] }.
+ State #msstate { pending_gc_completion = [Op | Pending] }.
+run_pending(State = #msstate { pending_gc_completion = [] }) ->
+ State;
run_pending(State = #msstate { pending_gc_completion = Pending }) ->
State1 = State #msstate { pending_gc_completion = [] },
- lists:foldl(fun run_pending/2, State1, Pending).
+ lists:foldl(fun run_pending/2, State1, lists:reverse(Pending)).
run_pending({read, MsgId, From}, State) ->
case read_message(MsgId, State) of
@@ -622,19 +614,16 @@ close_all_handles(State = #msstate { file_handle_cache = FHC }) ->
get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC }) ->
case dict:find(FileNum, FHC) of
{ok, Hdl} -> {Hdl, State};
- error -> new_handle(FileNum, filenum_to_name(FileNum),
+ error -> new_handle(FileNum,
+ rabbit_msg_store_misc:filenum_to_name(FileNum),
[read | ?BINARY_MODE], State)
end.
new_handle(Key, FileName, Mode, State = #msstate { file_handle_cache = FHC,
dir = Dir }) ->
- {ok, Hdl} = open_file(Dir, FileName, Mode),
+ {ok, Hdl} = rabbit_msg_store_misc:open_file(Dir, FileName, Mode),
{Hdl, State #msstate { file_handle_cache = dict:store(Key, Hdl, FHC) }}.
-open_file(Dir, FileName, Mode) ->
- file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
- [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
-
%%----------------------------------------------------------------------------
%% message cache helper functions
%%----------------------------------------------------------------------------
@@ -673,28 +662,25 @@ insert_into_cache(MsgId, Msg, #msstate { message_cache = Cache }) ->
%% index
%%----------------------------------------------------------------------------
-index_lookup(Key, #msstate { msg_locations = MsgLocations }) ->
- case ets:lookup(MsgLocations, Key) of
- [] -> not_found;
- [Entry] -> Entry
- end.
+index_lookup(Key, #msstate { index_module = Index, index_state = State }) ->
+ Index:lookup(Key, State).
-index_insert(Obj, #msstate { msg_locations = MsgLocations }) ->
- true = ets:insert_new(MsgLocations, Obj),
- ok.
+index_insert(Obj, #msstate { index_module = Index, index_state = State }) ->
+ Index:insert(Obj, State).
-index_update(Obj, #msstate { msg_locations = MsgLocations }) ->
- true = ets:insert(MsgLocations, Obj),
- ok.
+index_update(Obj, #msstate { index_module = Index, index_state = State }) ->
+ Index:update(Obj, State).
-index_delete(Key, #msstate { msg_locations = MsgLocations }) ->
- true = ets:delete(MsgLocations, Key),
- ok.
+index_update_fields(Key, Updates,
+ #msstate { index_module = Index, index_state = State }) ->
+ Index:update_fields(Key, Updates, State).
-index_delete_by_file(File, #msstate { msg_locations = MsgLocations }) ->
- MatchHead = #msg_location { file = File, _ = '_' },
- ets:select_delete(MsgLocations, [{MatchHead, [], [true]}]),
- ok.
+index_delete(Key, #msstate { index_module = Index, index_state = State }) ->
+ Index:delete(Key, State).
+
+index_delete_by_file(File, #msstate { index_module = Index,
+ index_state = State }) ->
+ Index:delete_by_file(File, State).
%%----------------------------------------------------------------------------
%% recovery
@@ -762,7 +748,7 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) ->
%% consist only of valid messages. Plan: Truncate the main file
%% back to before any of the files in the tmp file and copy
%% them over again
- TmpPath = form_filename(Dir, TmpFileName),
+ TmpPath = rabbit_msg_store_misc:form_filename(Dir, TmpFileName),
case is_sublist(MsgIdsTmp, MsgIds) of
true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file
%% note this also catches the case when the tmp file
@@ -794,8 +780,8 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) ->
%% are in the tmp file
true = is_disjoint(MsgIds1, MsgIdsTmp),
%% must open with read flag, otherwise will stomp over contents
- {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName,
- [read | ?WRITE_MODE]),
+ {ok, MainHdl} = rabbit_msg_store_misc:open_file(
+ Dir, NonTmpRelatedFileName, [read | ?WRITE_MODE]),
%% Wipe out any rubbish at the end of the file. Remember
%% the head of the list will be the highest entry in the
%% file.
@@ -804,8 +790,10 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) ->
%% Extend the main file as big as necessary in a single
%% move. If we run out of disk space, this truncate could
%% fail, but we still aren't risking losing data
- ok = truncate_and_extend_file(MainHdl, Top, Top + TmpSize),
- {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_AHEAD_MODE),
+ ok = rabbit_msg_store_misc:truncate_and_extend_file(
+ MainHdl, Top, Top + TmpSize),
+ {ok, TmpHdl} = rabbit_msg_store_misc:open_file(
+ Dir, TmpFileName, ?READ_AHEAD_MODE),
{ok, TmpSize} = file_handle_cache:copy(TmpHdl, MainHdl, TmpSize),
ok = file_handle_cache:close(MainHdl),
ok = file_handle_cache:delete(TmpHdl),
@@ -827,22 +815,10 @@ is_disjoint(SmallerL, BiggerL) ->
lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL).
scan_file_for_valid_messages_msg_ids(Dir, FileName) ->
- {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, FileName),
+ {ok, Messages, _FileSize} =
+ rabbit_msg_store_misc:scan_file_for_valid_messages(Dir, FileName),
{ok, Messages, [MsgId || {MsgId, _TotalSize, _FileOffset} <- Messages]}.
-scan_file_for_valid_messages(Dir, FileName) ->
- case open_file(Dir, FileName, ?READ_MODE) of
- {ok, Hdl} ->
- Valid = rabbit_msg_file:scan(Hdl),
- %% if something really bad's happened, the close could fail,
- %% but ignore
- file_handle_cache:close(Hdl),
- Valid;
- {error, enoent} -> {ok, [], 0};
- {error, Reason} -> throw({error,
- {unable_to_scan_file, FileName, Reason}})
- end.
-
%% Takes the list in *ascending* order (i.e. eldest message
%% first). This is the opposite of what scan_file_for_valid_messages
%% produces. The list of msgs that is produced is youngest first.
@@ -877,7 +853,8 @@ build_index(Left, [File|Files],
sum_valid_data = SumValid,
sum_file_size = SumFileSize }) ->
{ok, Messages, FileSize} =
- scan_file_for_valid_messages(Dir, filenum_to_name(File)),
+ rabbit_msg_store_misc:scan_file_for_valid_messages(
+ Dir, rabbit_msg_store_misc:filenum_to_name(File)),
{ValidMessages, ValidTotalSize} =
lists:foldl(
fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
@@ -922,262 +899,93 @@ build_index(Left, [File|Files],
maybe_roll_to_new_file(Offset,
State = #msstate { dir = Dir,
- file_size_limit = FileSizeLimit,
current_file_handle = CurHdl,
current_file = CurFile,
file_summary = FileSummary })
- when Offset >= FileSizeLimit ->
+ when Offset >= ?FILE_SIZE_LIMIT ->
State1 = sync(State),
ok = file_handle_cache:close(CurHdl),
NextFile = CurFile + 1,
- {ok, NextHdl} = open_file(Dir, filenum_to_name(NextFile), ?WRITE_MODE),
- true = ets:update_element(FileSummary, CurFile,
- {#file_summary.right, NextFile}),
+ {ok, NextHdl} = rabbit_msg_store_misc:open_file(
+ Dir, rabbit_msg_store_misc:filenum_to_name(NextFile),
+ ?WRITE_MODE),
true = ets:insert_new(
FileSummary, #file_summary {
file = NextFile, valid_total_size = 0, contiguous_top = 0,
left = CurFile, right = undefined, file_size = 0,
locked = false }),
+ true = ets:update_element(FileSummary, CurFile,
+ {#file_summary.right, NextFile}),
State1 #msstate { current_file_handle = NextHdl,
current_file = NextFile };
maybe_roll_to_new_file(_, State) ->
State.
+maybe_compact(State = #msstate { sum_valid_data = SumValid,
+ sum_file_size = SumFileSize,
+ file_summary = FileSummary,
+ gc_running = false })
+ when (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION ->
+ First = ets:first(FileSummary),
+ N = random_distributions:geometric(?GEOMETRIC_P),
+ case find_files_to_gc(FileSummary, N, First) of
+ undefined ->
+ State;
+ {Source, Dest} ->
+ State1 = close_handle(Source, close_handle(Dest, State)),
+ true = ets:update_element(FileSummary, Source,
+ {#file_summary.locked, true}),
+ true = ets:update_element(FileSummary, Dest,
+ {#file_summary.locked, true}),
+ ok = rabbit_msg_store_gc:gc(Source, Dest),
+ State1 #msstate { gc_running = true }
+ end;
maybe_compact(State) ->
- {_Bool, State1} = maybe_compact1(State),
- State1.
+ State.
-maybe_compact1(State = #msstate { sum_valid_data = SumValid,
- sum_file_size = SumFileSize,
- gc_pid = undefined,
- file_summary = FileSummary })
- when (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION ->
- %% Pid = spawn_link(fun() ->
- %% io:format("GC process!~n")
- %% %% gen_server2:pcast(?SERVER, 9, {gc_finished, self(),}),
- %% end),
- %% State #msstate { gc_pid = Pid };
- {true, State};
-maybe_compact1(State) ->
- {false, State}.
-
-compact(Files, State) ->
- %% smallest number, hence eldest, hence left-most, first
- SortedFiles = lists:sort(Files),
- %% foldl reverses, so now youngest/right-most first
- {RemainingFiles, State1} =
- lists:foldl(fun (File, {Acc, State2}) ->
- case delete_file_if_empty(File, State2) of
- {true, State3} -> {Acc, State3};
- {false, State3} -> {[File | Acc], State3}
- end
- end, {[], State}, SortedFiles),
- lists:foldl(fun combine_file/2, State1, lists:reverse(RemainingFiles)).
-
-%% At this stage, we simply know that the file has had msgs removed
-%% from it. However, we don't know if we need to merge it left (which
-%% is what we would prefer), or merge it right. If we merge left, then
-%% this file is the source, and the left file is the destination. If
-%% we merge right then this file is the destination and the right file
-%% is the source.
-combine_file(File, State = #msstate { file_summary = FileSummary,
- current_file = CurFile }) ->
- %% the file we're looking at may no longer exist as it may have
- %% been deleted within the current GC run
- case ets:lookup(FileSummary, File) of
- [] -> State;
- [FSEntry = #file_summary { left = Left, right = Right }] ->
- GoRight =
- fun() ->
- case Right of
- undefined -> State;
- _ when not (CurFile == Right) ->
- [FSRight] = ets:lookup(FileSummary, Right),
- {_, State1} = adjust_meta_and_combine(
- FSEntry, FSRight, State),
- State1;
- _ -> State
- end
- end,
- case Left of
- undefined ->
- GoRight();
- _ -> [FSLeft] = ets:lookup(FileSummary, Left),
- case adjust_meta_and_combine(FSLeft, FSEntry, State) of
- {true, State1} -> State1;
- {false, State} -> GoRight()
- end
- end
+find_files_to_gc(_FileSummary, _N, '$end_of_table') ->
+ undefined;
+find_files_to_gc(FileSummary, N, First) ->
+ [FirstObj = #file_summary { right = Right }] =
+ ets:lookup(FileSummary, First),
+ Pairs =
+ find_files_to_gc(FileSummary, N, FirstObj,
+ ets:lookup(FileSummary, Right), []),
+ case Pairs of
+ [] -> undefined;
+ [Pair] -> Pair;
+ _ -> M = 1 + (N rem length(Pairs)),
+ lists:nth(M, Pairs)
end.
-adjust_meta_and_combine(
- LeftObj = #file_summary {
- file = LeftFile, valid_total_size = LeftValidData, right = RightFile,
- file_size = LeftFileSize, locked = true },
- RightObj = #file_summary {
- file = RightFile, valid_total_size = RightValidData, left = LeftFile,
- right = RightRight, file_size = RightFileSize, locked = true },
- State) ->
- TotalValidData = LeftValidData + RightValidData,
- {NewMsgLocs, State1} = combine_files(RightObj, LeftObj, State),
- %% %% this could fail if RightRight is undefined
- %% ets:update_element(FileSummary, RightRight,
- %% {#file_summary.left, LeftFile}),
- %% true = ets:delete(FileSummary, RightFile),
- LeftObj1 = LeftObj #file_summary {
- valid_total_size = TotalValidData,
- contiguous_top = TotalValidData,
- file_size = TotalValidData,
- right = RightRight },
- {RightFile, LeftObj1, NewMsgLocs,
- TotalValidData - LeftFileSize - RightFileSize,
- State1}.
-
-combine_files(#file_summary { file = Source,
- valid_total_size = SourceValid,
- left = Destination },
- #file_summary { file = Destination,
- valid_total_size = DestinationValid,
- contiguous_top = DestinationContiguousTop,
- right = Source },
- State = #gcstate { dir = Dir }) ->
- State1 = close_handle(Source, close_handle(Destination, State)),
- SourceName = filenum_to_name(Source),
- DestinationName = filenum_to_name(Destination),
- {ok, SourceHdl} = open_file(Dir, SourceName, ?READ_AHEAD_MODE),
- {ok, DestinationHdl} = open_file(Dir, DestinationName,
- ?READ_AHEAD_MODE ++ ?WRITE_MODE),
- ExpectedSize = SourceValid + DestinationValid,
- %% if DestinationValid =:= DestinationContiguousTop then we don't
- %% need a tmp file
- %% if they're not equal, then we need to write out everything past
- %% the DestinationContiguousTop to a tmp file then truncate,
- %% copy back in, and then copy over from Source
- %% otherwise we just truncate straight away and copy over from Source
- NewDestLocs =
- if DestinationContiguousTop =:= DestinationValid ->
- ok = truncate_and_extend_file(DestinationHdl,
- DestinationValid, ExpectedSize),
- [];
- true ->
- Worklist =
- lists:dropwhile(
- fun (#msg_location { offset = Offset })
- when Offset /= DestinationContiguousTop ->
- %% it cannot be that Offset ==
- %% DestinationContiguousTop because if
- %% it was then DestinationContiguousTop
- %% would have been extended by TotalSize
- Offset < DestinationContiguousTop
- %% Given expected access patterns, I
- %% suspect that the list should be
- %% naturally sorted as we require,
- %% however, we need to enforce it anyway
- end,
- find_unremoved_messages_in_file(Destination, State1)),
- Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
- {ok, TmpHdl} =
- open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
- {ok, NewDestLocs1} =
- copy_messages(
- Worklist, DestinationContiguousTop, DestinationValid,
- DestinationHdl, TmpHdl, Destination),
- TmpSize = DestinationValid - DestinationContiguousTop,
- %% so now Tmp contains everything we need to salvage
- %% from Destination, and NewDestLocs1 contains
- %% msg_locations reflecting the compaction of
- %% Destination so truncate Destination and copy from
- %% Tmp back to the end
- {ok, 0} = file_handle_cache:position(TmpHdl, 0),
- ok = truncate_and_extend_file(
- DestinationHdl, DestinationContiguousTop, ExpectedSize),
- {ok, TmpSize} =
- file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
- %% position in DestinationHdl should now be DestinationValid
- ok = file_handle_cache:sync(DestinationHdl),
- ok = file_handle_cache:close(TmpHdl),
- ok = file:delete(form_filename(Dir, Tmp)),
- NewDestLocs1
- end,
- SourceWorkList = find_unremoved_messages_in_file(Source, State1),
- {ok, NewSourceLocs} =
- copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
- SourceHdl, DestinationHdl, Destination),
- %% tidy up
- ok = file_handle_cache:close(SourceHdl),
- ok = file_handle_cache:close(DestinationHdl),
- ok = file:delete(form_filename(Dir, SourceName)),
- {[NewDestLocs, NewSourceLocs], State1}.
-
-find_unremoved_messages_in_file(File, State = #gcstate { dir = Dir }) ->
- %% Msgs here will be end-of-file at start-of-list
- {ok, Messages, _FileSize} =
- scan_file_for_valid_messages(Dir, filenum_to_name(File)),
- %% foldl will reverse so will end up with msgs in ascending offset order
- lists:foldl(
- fun ({MsgId, _TotalSize, _Offset}, Acc) ->
- case index_lookup(MsgId, State) of
- Entry = #msg_location { file = File } -> [ Entry | Acc ];
- _ -> Acc
- end
- end, [], Messages).
-
-copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
- Destination) ->
- {FinalOffset, BlockStart1, BlockEnd1, NewMsgLocations} =
- lists:foldl(
- fun (StoreEntry = #msg_location { offset = Offset,
- total_size = TotalSize },
- {CurOffset, BlockStart, BlockEnd, NewMsgLocs}) ->
- %% CurOffset is in the DestinationFile.
- %% Offset, BlockStart and BlockEnd are in the SourceFile
- %% update MsgLocation to reflect change of file and offset
- NewMsgLocs1 =
- [StoreEntry #msg_location {
- file = Destination,
- offset = CurOffset } | NewMsgLocs],
- NextOffset = CurOffset + TotalSize,
- {BlockStart2, BlockEnd2} =
- if BlockStart =:= undefined ->
- %% base case, called only for the first list elem
- {Offset, Offset + TotalSize};
- Offset =:= BlockEnd ->
- %% extend the current block because the
- %% next msg follows straight on
- {BlockStart, BlockEnd + TotalSize};
- true ->
- %% found a gap, so actually do the work
- %% for the previous block
- BSize = BlockEnd - BlockStart,
- {ok, BlockStart} =
- file_handle_cache:position(SourceHdl,
- BlockStart),
- {ok, BSize} = file_handle_cache:copy(
- SourceHdl, DestinationHdl, BSize),
- {Offset, Offset + TotalSize}
- end,
- {NextOffset, BlockStart2, BlockEnd2, NewMsgLocs1}
- end, {InitOffset, undefined, undefined, []}, WorkList),
- case WorkList of
- [] ->
- ok;
- _ ->
- %% do the last remaining block
- BSize1 = BlockEnd1 - BlockStart1,
- {ok, BlockStart1} =
- file_handle_cache:position(SourceHdl, BlockStart1),
- {ok, BSize1} =
- file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1),
- ok = file_handle_cache:sync(DestinationHdl)
- end,
- {ok, NewMsgLocations}.
+find_files_to_gc(_FileSummary, _N, #file_summary {}, [], Pairs) ->
+ lists:reverse(Pairs);
+find_files_to_gc(FileSummary, N,
+ #file_summary { right = Source, file = Dest,
+ valid_total_size = DestValid },
+ [SourceObj = #file_summary { left = Dest, right = SourceRight,
+ valid_total_size = SourceValid,
+ file = Source }],
+ Pairs) when DestValid + SourceValid =< ?FILE_SIZE_LIMIT andalso
+ not is_atom(SourceRight) ->
+ Pair = {Source, Dest},
+ case N == 1 of
+ true -> [Pair];
+ false -> find_files_to_gc(FileSummary, (N - 1), SourceObj,
+ ets:lookup(FileSummary, SourceRight),
+ [Pair | Pairs])
+ end;
+find_files_to_gc(FileSummary, N, _Left,
+ [Right = #file_summary { right = RightRight }], Pairs) ->
+ find_files_to_gc(FileSummary, N, Right,
+ ets:lookup(FileSummary, RightRight), Pairs).
delete_file_if_empty(File, State = #msstate { current_file = File }) ->
State;
delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary,
sum_file_size = SumFileSize } = State) ->
[#file_summary { valid_total_size = ValidData, file_size = FileSize,
- left = Left, right = Right }] =
+ left = Left, right = Right, locked = false }] =
ets:lookup(FileSummary, File),
case ValidData of
%% we should NEVER find the current file in here hence right
@@ -1188,16 +996,17 @@ delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary,
true = ets:update_element(
FileSummary, Right,
{#file_summary.left, undefined});
- {_, _} when not (is_atom(Right)) ->
+ {_, _} when not is_atom(Right) ->
true = ets:update_element(FileSummary, Right,
{#file_summary.left, Left}),
- true =
- ets:update_element(FileSummary, Left,
- {#file_summary.right, Right})
+ true = ets:update_element(FileSummary, Left,
+ {#file_summary.right, Right})
end,
true = ets:delete(FileSummary, File),
State1 = close_handle(File, State),
- ok = file:delete(form_filename(Dir, filenum_to_name(File))),
+ ok = file:delete(rabbit_msg_store_misc:form_filename(
+ Dir,
+ rabbit_msg_store_misc:filenum_to_name(File))),
State1 #msstate { sum_file_size = SumFileSize - FileSize };
_ -> State
end.
diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl
new file mode 100644
index 00000000..cb13ed86
--- /dev/null
+++ b/src/rabbit_msg_store_ets_index.erl
@@ -0,0 +1,71 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_msg_store_ets_index).
+-export([init/0, lookup/2, insert/2, update/2, update_fields/3, delete/2,
+ delete_by_file/2, terminate/1]).
+
+-define(MSG_LOC_NAME, rabbit_msg_store_ets_index).
+
+-include("rabbit_msg_store.hrl").
+
+init() ->
+ ets:new(?MSG_LOC_NAME, [set, public, {keypos, #msg_location.msg_id}]).
+
+lookup(Key, MsgLocations) ->
+ case ets:lookup(MsgLocations, Key) of
+ [] -> not_found;
+ [Entry] -> Entry
+ end.
+
+insert(Obj, MsgLocations) ->
+ true = ets:insert_new(MsgLocations, Obj),
+ ok.
+
+update(Obj, MsgLocations) ->
+ true = ets:insert(MsgLocations, Obj),
+ ok.
+
+update_fields(Key, Updates, MsgLocations) ->
+ true = ets:update_element(MsgLocations, Key, Updates),
+ ok.
+
+delete(Key, MsgLocations) ->
+ true = ets:delete(MsgLocations, Key),
+ ok.
+
+delete_by_file(File, MsgLocations) ->
+ MatchHead = #msg_location { file = File, _ = '_' },
+ ets:select_delete(MsgLocations, [{MatchHead, [], [true]}]),
+ ok.
+
+terminate(MsgLocations) ->
+ ets:delete(MsgLocations).
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
new file mode 100644
index 00000000..729cd287
--- /dev/null
+++ b/src/rabbit_msg_store_gc.erl
@@ -0,0 +1,249 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_msg_store_gc).
+
+-behaviour(gen_server2).
+
+-export([start_link/4, gc/2, stop/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(gcstate,
+ {dir,
+ index_state,
+ file_summary,
+ index_module
+ }).
+
+-include("rabbit_msg_store.hrl").
+
+-define(SERVER, ?MODULE).
+
+%%----------------------------------------------------------------------------
+
+start_link(Dir, IndexState, FileSummary, IndexModule) ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE,
+ [Dir, IndexState, FileSummary, IndexModule],
+ [{timeout, infinity}]).
+
+gc(Source, Destination) ->
+ gen_server2:cast(?SERVER, {gc, Source, Destination}).
+
+stop() ->
+ gen_server2:call(?SERVER, stop).
+
+%%----------------------------------------------------------------------------
+
+init([Dir, IndexState, FileSummary, IndexModule]) ->
+ {ok, #gcstate { dir = Dir, index_state = IndexState,
+ file_summary = FileSummary, index_module = IndexModule },
+ hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call(stop, _From, State) ->
+ {stop, normal, ok, State}.
+
+handle_cast({gc, Source, Destination}, State) ->
+ Reclaimed = adjust_meta_and_combine(Source, Destination, State),
+ ok = rabbit_msg_store:gc_done(Reclaimed, Source, Destination),
+ {noreply, State, hibernate}.
+
+handle_info(Info, State) ->
+ {stop, {unhandled_info, Info}, State}.
+
+terminate(_Reason, State) ->
+ State.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%----------------------------------------------------------------------------
+
+adjust_meta_and_combine(SourceFile, DestFile,
+ State = #gcstate { file_summary = FileSummary }) ->
+
+ [SourceObj = #file_summary {
+ valid_total_size = SourceValidData, left = DestFile,
+ file_size = SourceFileSize, locked = true }] =
+ ets:lookup(FileSummary, SourceFile),
+ [DestObj = #file_summary {
+ valid_total_size = DestValidData, right = SourceFile,
+ file_size = DestFileSize, locked = true }] =
+ ets:lookup(FileSummary, DestFile),
+
+ TotalValidData = DestValidData + SourceValidData,
+ ok = combine_files(SourceObj, DestObj, State),
+ %% don't update dest.right, because it could be changing at the same time
+ true =
+ ets:update_element(FileSummary, DestFile,
+ [{#file_summary.valid_total_size, TotalValidData},
+ {#file_summary.contiguous_top, TotalValidData},
+ {#file_summary.file_size, TotalValidData}]),
+ SourceFileSize + DestFileSize - TotalValidData.
+
+combine_files(#file_summary { file = Source,
+ valid_total_size = SourceValid,
+ left = Destination },
+ #file_summary { file = Destination,
+ valid_total_size = DestinationValid,
+ contiguous_top = DestinationContiguousTop,
+ right = Source },
+ State = #gcstate { dir = Dir }) ->
+ SourceName = rabbit_msg_store_misc:filenum_to_name(Source),
+ DestinationName = rabbit_msg_store_misc:filenum_to_name(Destination),
+ {ok, SourceHdl} =
+ rabbit_msg_store_misc:open_file(Dir, SourceName, ?READ_AHEAD_MODE),
+ {ok, DestinationHdl} =
+ rabbit_msg_store_misc:open_file(Dir, DestinationName,
+ ?READ_AHEAD_MODE ++ ?WRITE_MODE),
+ ExpectedSize = SourceValid + DestinationValid,
+ %% if DestinationValid =:= DestinationContiguousTop then we don't
+ %% need a tmp file
+ %% if they're not equal, then we need to write out everything past
+ %% the DestinationContiguousTop to a tmp file then truncate,
+ %% copy back in, and then copy over from Source
+ %% otherwise we just truncate straight away and copy over from Source
+ if DestinationContiguousTop =:= DestinationValid ->
+ ok = rabbit_msg_store_misc:truncate_and_extend_file(
+ DestinationHdl, DestinationValid, ExpectedSize);
+ true ->
+ Worklist =
+ lists:dropwhile(
+ fun (#msg_location { offset = Offset })
+ when Offset /= DestinationContiguousTop ->
+ %% it cannot be that Offset ==
+ %% DestinationContiguousTop because if it
+ %% was then DestinationContiguousTop would
+ %% have been extended by TotalSize
+ Offset < DestinationContiguousTop
+ %% Given expected access patterns, I suspect
+ %% that the list should be naturally sorted
+ %% as we require, however, we need to
+ %% enforce it anyway
+ end,
+ find_unremoved_messages_in_file(Destination, State)),
+ Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
+ {ok, TmpHdl} = rabbit_msg_store_misc:open_file(
+ Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
+ ok = copy_messages(
+ Worklist, DestinationContiguousTop, DestinationValid,
+ DestinationHdl, TmpHdl, Destination, State),
+ TmpSize = DestinationValid - DestinationContiguousTop,
+ %% so now Tmp contains everything we need to salvage from
+ %% Destination, and index_state has been updated to
+ %% reflect the compaction of Destination so truncate
+ %% Destination and copy from Tmp back to the end
+ {ok, 0} = file_handle_cache:position(TmpHdl, 0),
+ ok = rabbit_msg_store_misc:truncate_and_extend_file(
+ DestinationHdl, DestinationContiguousTop, ExpectedSize),
+ {ok, TmpSize} =
+ file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
+ %% position in DestinationHdl should now be DestinationValid
+ ok = file_handle_cache:sync(DestinationHdl),
+ ok = file_handle_cache:close(TmpHdl),
+ ok = file:delete(rabbit_msg_store_misc:form_filename(Dir, Tmp))
+ end,
+ SourceWorkList = find_unremoved_messages_in_file(Source, State),
+ ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
+ SourceHdl, DestinationHdl, Destination, State),
+ %% tidy up
+ ok = file_handle_cache:close(SourceHdl),
+ ok = file_handle_cache:close(DestinationHdl),
+ ok = file:delete(rabbit_msg_store_misc:form_filename(Dir, SourceName)),
+ ok.
+
+find_unremoved_messages_in_file(File, #gcstate { dir = Dir,
+ index_state = IndexState,
+ index_module = Index }) ->
+ %% Msgs here will be end-of-file at start-of-list
+ {ok, Messages, _FileSize} =
+ rabbit_msg_store_misc:scan_file_for_valid_messages(
+ Dir, rabbit_msg_store_misc:filenum_to_name(File)),
+ %% foldl will reverse so will end up with msgs in ascending offset order
+ lists:foldl(
+ fun ({MsgId, _TotalSize, _Offset}, Acc) ->
+ case Index:lookup(MsgId, IndexState) of
+ Entry = #msg_location { file = File } -> [ Entry | Acc ];
+ _ -> Acc
+ end
+ end, [], Messages).
+
+copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
+ Destination, #gcstate { index_module = Index,
+ index_state = IndexState }) ->
+ {FinalOffset, BlockStart1, BlockEnd1} =
+ lists:foldl(
+ fun (#msg_location { msg_id = MsgId, offset = Offset,
+ total_size = TotalSize },
+ {CurOffset, BlockStart, BlockEnd}) ->
+ %% CurOffset is in the DestinationFile.
+ %% Offset, BlockStart and BlockEnd are in the SourceFile
+ %% update MsgLocation to reflect change of file and offset
+ ok = Index:update_fields(MsgId,
+ [{#msg_location.file, Destination},
+ {#msg_location.offset, CurOffset}],
+ IndexState),
+ {BlockStart2, BlockEnd2} =
+ if BlockStart =:= undefined ->
+ %% base case, called only for the first list elem
+ {Offset, Offset + TotalSize};
+ Offset =:= BlockEnd ->
+ %% extend the current block because the
+ %% next msg follows straight on
+ {BlockStart, BlockEnd + TotalSize};
+ true ->
+ %% found a gap, so actually do the work
+ %% for the previous block
+ BSize = BlockEnd - BlockStart,
+ {ok, BlockStart} =
+ file_handle_cache:position(SourceHdl,
+ BlockStart),
+ {ok, BSize} = file_handle_cache:copy(
+ SourceHdl, DestinationHdl, BSize),
+ {Offset, Offset + TotalSize}
+ end,
+ {CurOffset + TotalSize, BlockStart2, BlockEnd2}
+ end, {InitOffset, undefined, undefined}, WorkList),
+ case WorkList of
+ [] ->
+ ok;
+ _ ->
+ %% do the last remaining block
+ BSize1 = BlockEnd1 - BlockStart1,
+ {ok, BlockStart1} =
+ file_handle_cache:position(SourceHdl, BlockStart1),
+ {ok, BSize1} =
+ file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1),
+ ok = file_handle_cache:sync(DestinationHdl)
+ end,
+ ok.
diff --git a/src/rabbit_msg_store_misc.erl b/src/rabbit_msg_store_misc.erl
new file mode 100644
index 00000000..cf76cf21
--- /dev/null
+++ b/src/rabbit_msg_store_misc.erl
@@ -0,0 +1,74 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_msg_store_misc).
+
+-export([open_file/3, preallocate/3, truncate_and_extend_file/3,
+ form_filename/2, filenum_to_name/1, scan_file_for_valid_messages/2]).
+
+-include("rabbit_msg_store.hrl").
+
+
+%%----------------------------------------------------------------------------
+
+open_file(Dir, FileName, Mode) ->
+ file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
+ [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
+
+%%----------------------------------------------------------------------------
+
+preallocate(Hdl, FileSizeLimit, FinalPos) ->
+ {ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit),
+ ok = file_handle_cache:truncate(Hdl),
+ {ok, FinalPos} = file_handle_cache:position(Hdl, FinalPos),
+ ok.
+
+truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) ->
+ {ok, Lowpoint} = file_handle_cache:position(FileHdl, Lowpoint),
+ ok = file_handle_cache:truncate(FileHdl),
+ ok = preallocate(FileHdl, Highpoint, Lowpoint).
+
+form_filename(Dir, Name) -> filename:join(Dir, Name).
+
+filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
+
+scan_file_for_valid_messages(Dir, FileName) ->
+ case open_file(Dir, FileName, ?READ_MODE) of
+ {ok, Hdl} ->
+ Valid = rabbit_msg_file:scan(Hdl),
+ %% if something really bad's happened, the close could fail,
+ %% but ignore
+ file_handle_cache:close(Hdl),
+ Valid;
+ {error, enoent} -> {ok, [], 0};
+ {error, Reason} -> throw({error,
+ {unable_to_scan_file, FileName, Reason}})
+ end.