diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-12-20 03:43:21 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-12-20 03:43:21 +0000 |
commit | 5307e4c6a7f21458fda7fb97020dd838005046e2 (patch) | |
tree | 7989c1dcdd689a11903ea8475cf74776039c486d | |
parent | 6d3e84b4d013b0dc306927324de34b0d41133c84 (diff) | |
download | rabbitmq-server-5307e4c6a7f21458fda7fb97020dd838005046e2.tar.gz |
background lazy GC in and working
-rw-r--r-- | include/rabbit_msg_store.hrl | 51 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 523 | ||||
-rw-r--r-- | src/rabbit_msg_store_ets_index.erl | 71 | ||||
-rw-r--r-- | src/rabbit_msg_store_gc.erl | 249 | ||||
-rw-r--r-- | src/rabbit_msg_store_misc.erl | 74 |
5 files changed, 611 insertions, 357 deletions
diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl new file mode 100644 index 00000000..925d5d8e --- /dev/null +++ b/include/rabbit_msg_store.hrl @@ -0,0 +1,51 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-record(msg_location, + {msg_id, ref_count, file, offset, total_size}). + +-record(file_summary, + {file, valid_total_size, contiguous_top, left, right, file_size, + locked}). + +-define(BINARY_MODE, [raw, binary]). +-define(READ_MODE, [read]). +-define(READ_AHEAD_MODE, [read_ahead | ?READ_MODE]). +-define(WRITE_MODE, [write]). + +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). +-define(FILE_EXTENSION, ".rdq"). +-define(FILE_EXTENSION_TMP, ".rdt"). + +-define(FILE_SIZE_LIMIT, (16*1024*1024)). + +-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index c8d27ba6..f40c6270 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -36,16 +36,16 @@ -export([start_link/3, write/2, read/1, contains/1, remove/1, release/1, sync/2]). --export([sync/0]). %% internal +-export([sync/0, gc_done/3]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1]). -define(SERVER, ?MODULE). --define(FILE_SIZE_LIMIT, (16*1024*1024)). -define(SYNC_INTERVAL, 5). %% milliseconds --define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB + +-define(GEOMETRIC_P, 0.3). %% parameter to geometric distribution rng %%---------------------------------------------------------------------------- @@ -54,6 +54,7 @@ -type(msg_id() :: binary()). -type(msg() :: any()). -type(file_path() :: any()). +-type(file_num() :: non_neg_integer()). -spec(start_link/3 :: (file_path(), @@ -65,6 +66,7 @@ -spec(remove/1 :: ([msg_id()]) -> 'ok'). -spec(release/1 :: ([msg_id()]) -> 'ok'). -spec(sync/2 :: ([msg_id()], fun (() -> any())) -> 'ok'). +-spec(gc_done/3 :: (non_neg_integer(), file_num(), file_num()) -> 'ok'). -endif. @@ -72,12 +74,12 @@ -record(msstate, {dir, %% store directory - msg_locations, %% where are messages? + index_module, %% the module for index ops + index_state, %% where are messages? file_summary, %% what's in the files? current_file, %% current file name as number current_file_handle, %% current file handle %% since the last fsync? - file_size_limit, %% how big can our files get? file_handle_cache, %% file handle cache on_sync, %% pending sync requests sync_timer_ref, %% TRef for our interval timer @@ -85,36 +87,17 @@ sum_valid_data, %% sum of valid data in all files sum_file_size, %% sum of file sizes pending_gc_completion, %% things to do once GC completes - gc_pid %% pid of the GC process + gc_running %% is the GC currently working? }). --record(msg_location, - {msg_id, ref_count, file, offset, total_size}). - --record(file_summary, - {file, valid_total_size, contiguous_top, left, right, file_size, - locked}). - --record(gcstate, - {dir - }). +-include("rabbit_msg_store.hrl"). --define(MSG_LOC_NAME, rabbit_disk_queue_msg_location). --define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary). --define(FILE_EXTENSION, ".rdq"). --define(FILE_EXTENSION_TMP, ".rdt"). --define(CACHE_ETS_NAME, rabbit_disk_queue_cache). +-define(FILE_SUMMARY_ETS_NAME, rabbit_msg_store_file_summary). +-define(CACHE_ETS_NAME, rabbit_msg_store_cache). %% We run GC whenever (garbage / sum_file_size) > ?GARBAGE_FRACTION +%% It is not recommended to set this to < 0.5 -define(GARBAGE_FRACTION, 0.5). --define(BINARY_MODE, [raw, binary]). --define(READ_MODE, [read]). --define(READ_AHEAD_MODE, [read_ahead | ?READ_MODE]). --define(WRITE_MODE, [write]). - --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). - %% The components: %% %% MsgLocation: this is an ets table which contains: @@ -249,6 +232,8 @@ remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}). release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}). sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}). sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal +gc_done(Reclaimed, Source, Destination) -> + gen_server2:pcast(?SERVER, 9, {gc_done, Reclaimed, Source, Destination}). %%---------------------------------------------------------------------------- %% gen_server callbacks @@ -259,21 +244,21 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")), - MsgLocations = ets:new(?MSG_LOC_NAME, - [set, protected, {keypos, #msg_location.msg_id}]), + IndexModule = rabbit_msg_store_ets_index, + IndexState = IndexModule:init(), InitFile = 0, FileSummary = ets:new(?FILE_SUMMARY_ETS_NAME, - [ordered_set, protected, + [ordered_set, public, {keypos, #file_summary.file}]), MessageCache = ets:new(?CACHE_ETS_NAME, [set, private]), State = #msstate { dir = Dir, - msg_locations = MsgLocations, + index_module = IndexModule, + index_state = IndexState, file_summary = FileSummary, current_file = InitFile, current_file_handle = undefined, - file_size_limit = ?FILE_SIZE_LIMIT, file_handle_cache = dict:new(), on_sync = [], sync_timer_ref = undefined, @@ -281,7 +266,7 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> sum_valid_data = 0, sum_file_size = 0, pending_gc_completion = [], - gc_pid = undefined + gc_running = false }, ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State), @@ -297,11 +282,15 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> build_index(Files, State), %% read is only needed so that we can seek - {ok, FileHdl} = open_file(Dir, filenum_to_name(CurFile), - [read | ?WRITE_MODE]), + {ok, FileHdl} = rabbit_msg_store_misc:open_file( + Dir, rabbit_msg_store_misc:filenum_to_name(CurFile), + [read | ?WRITE_MODE]), {ok, Offset} = file_handle_cache:position(FileHdl, Offset), ok = file_handle_cache:truncate(FileHdl), + {ok, _Pid} = rabbit_msg_store_gc:start_link( + Dir, IndexState, FileSummary, IndexModule), + {ok, State1 #msstate { current_file_handle = FileHdl }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -355,10 +344,12 @@ handle_cast({write, MsgId, Msg}, { sum_valid_data = SumValid + TotalSize, sum_file_size = SumFileSize + TotalSize } ))); - StoreEntry = #msg_location { ref_count = RefCount } -> - %% We already know about it, just update counter - ok = index_update(StoreEntry #msg_location { - ref_count = RefCount + 1 }, State), + #msg_location { ref_count = RefCount } -> + %% We already know about it, just update counter. Only + %% update field otherwise bad interaction with concurrent GC + ok = index_update_fields(MsgId, + {#msg_location.ref_count, RefCount + 1}, + State), noreply(State) end; @@ -388,15 +379,27 @@ handle_cast({sync, MsgIds, K}, end; handle_cast(sync, State) -> - noreply(sync(State)). + noreply(sync(State)); -%% handle_cast({gc_finished, GCPid, RemainingFile, DeletedFile, MsgLocations}, -%% State = #msstate { file_summary = FileSummary, -%% gc_pid = GCPid }) -> -%% true = ets:delete(FileSummary, DeletedFile), -%% true = ets:insert(FileSummary, RemainingFile), -%% State1 = lists:foldl(fun index_insert/2, State, MsgLocations), -%% noreply(maybe_compact(run_pending(State1))). +handle_cast({gc_done, Reclaimed, Source, Dest}, + State = #msstate { sum_file_size = SumFileSize, + gc_running = true, + file_summary = FileSummary }) -> + %% we always move data left, so Source has gone and was on the + %% right, so need to make dest = source.right.left, and also + %% dest.right = source.right + [#file_summary { left = Dest, right = SourceRight, locked = true }] = + ets:lookup(FileSummary, Source), + %% this could fail if SourceRight == undefined + ets:update_element(FileSummary, SourceRight, + {#file_summary.left, Dest}), + true = ets:update_element(FileSummary, Dest, + [{#file_summary.locked, false}, + {#file_summary.right, SourceRight}]), + true = ets:delete(FileSummary, Source), + noreply(run_pending( + State #msstate { sum_file_size = SumFileSize - Reclaimed, + gc_running = false })). handle_info(timeout, State) -> noreply(sync(State)); @@ -408,9 +411,13 @@ handle_info({file_handle_cache, maximum_eldest_since_use, Age}, State) -> handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. -terminate(_Reason, State = #msstate { msg_locations = MsgLocations, +terminate(_Reason, State = #msstate { index_state = IndexState, + index_module = IndexModule, file_summary = FileSummary, current_file_handle = FileHdl }) -> + %% stop the gc first, otherwise it could be working and we pull + %% out the ets tables from under it. + ok = rabbit_msg_store_gc:stop(), State1 = case FileHdl of undefined -> State; _ -> State2 = sync(State), @@ -418,9 +425,9 @@ terminate(_Reason, State = #msstate { msg_locations = MsgLocations, State2 end, State3 = close_all_handles(State1), - ets:delete(MsgLocations), ets:delete(FileSummary), - State3 #msstate { msg_locations = undefined, + IndexModule:terminate(IndexState), + State3 #msstate { index_state = undefined, file_summary = undefined, current_file_handle = undefined }. @@ -428,11 +435,7 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. handle_pre_hibernate(State) -> - {Result, State1} = maybe_compact1(State), - {case Result of - true -> insomniate; - false -> hibernate - end, State1}. + {hibernate, maybe_compact(State)}. %%---------------------------------------------------------------------------- %% general helper functions @@ -465,27 +468,12 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> {ok, cancel} = timer:cancel(TRef), State #msstate { sync_timer_ref = undefined }. -form_filename(Dir, Name) -> filename:join(Dir, Name). - -filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION. - filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)). sort_file_names(FileNames) -> lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end, FileNames). -preallocate(Hdl, FileSizeLimit, FinalPos) -> - {ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit), - ok = file_handle_cache:truncate(Hdl), - {ok, FinalPos} = file_handle_cache:position(Hdl, FinalPos), - ok. - -truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> - {ok, Lowpoint} = file_handle_cache:position(FileHdl, Lowpoint), - ok = file_handle_cache:truncate(FileHdl), - ok = preallocate(FileHdl, Highpoint, Lowpoint). - sync(State = #msstate { current_file_handle = CurHdl, on_sync = Syncs }) -> State1 = stop_sync_timer(State), @@ -556,8 +544,8 @@ read_message(MsgId, State = remove_message(MsgId, State = #msstate { file_summary = FileSummary, sum_valid_data = SumValid }) -> - StoreEntry = #msg_location { ref_count = RefCount, file = File, - offset = Offset, total_size = TotalSize } = + #msg_location { ref_count = RefCount, file = File, + offset = Offset, total_size = TotalSize } = index_lookup(MsgId, State), case RefCount of 1 -> @@ -582,18 +570,22 @@ remove_message(MsgId, State = #msstate { file_summary = FileSummary, end; _ when 1 < RefCount -> ok = decrement_cache(MsgId, State), - ok = index_update(StoreEntry #msg_location - { ref_count = RefCount - 1 }, State), + %% only update field, otherwise bad interaction with concurrent GC + ok = index_update_fields(MsgId, + {#msg_location.ref_count, RefCount - 1}, + State), State end. add_to_pending_gc_completion( Op, State = #msstate { pending_gc_completion = Pending }) -> - State #msstate { pending_gc_completion = [Op, Pending] }. + State #msstate { pending_gc_completion = [Op | Pending] }. +run_pending(State = #msstate { pending_gc_completion = [] }) -> + State; run_pending(State = #msstate { pending_gc_completion = Pending }) -> State1 = State #msstate { pending_gc_completion = [] }, - lists:foldl(fun run_pending/2, State1, Pending). + lists:foldl(fun run_pending/2, State1, lists:reverse(Pending)). run_pending({read, MsgId, From}, State) -> case read_message(MsgId, State) of @@ -622,19 +614,16 @@ close_all_handles(State = #msstate { file_handle_cache = FHC }) -> get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC }) -> case dict:find(FileNum, FHC) of {ok, Hdl} -> {Hdl, State}; - error -> new_handle(FileNum, filenum_to_name(FileNum), + error -> new_handle(FileNum, + rabbit_msg_store_misc:filenum_to_name(FileNum), [read | ?BINARY_MODE], State) end. new_handle(Key, FileName, Mode, State = #msstate { file_handle_cache = FHC, dir = Dir }) -> - {ok, Hdl} = open_file(Dir, FileName, Mode), + {ok, Hdl} = rabbit_msg_store_misc:open_file(Dir, FileName, Mode), {Hdl, State #msstate { file_handle_cache = dict:store(Key, Hdl, FHC) }}. -open_file(Dir, FileName, Mode) -> - file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode, - [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]). - %%---------------------------------------------------------------------------- %% message cache helper functions %%---------------------------------------------------------------------------- @@ -673,28 +662,25 @@ insert_into_cache(MsgId, Msg, #msstate { message_cache = Cache }) -> %% index %%---------------------------------------------------------------------------- -index_lookup(Key, #msstate { msg_locations = MsgLocations }) -> - case ets:lookup(MsgLocations, Key) of - [] -> not_found; - [Entry] -> Entry - end. +index_lookup(Key, #msstate { index_module = Index, index_state = State }) -> + Index:lookup(Key, State). -index_insert(Obj, #msstate { msg_locations = MsgLocations }) -> - true = ets:insert_new(MsgLocations, Obj), - ok. +index_insert(Obj, #msstate { index_module = Index, index_state = State }) -> + Index:insert(Obj, State). -index_update(Obj, #msstate { msg_locations = MsgLocations }) -> - true = ets:insert(MsgLocations, Obj), - ok. +index_update(Obj, #msstate { index_module = Index, index_state = State }) -> + Index:update(Obj, State). -index_delete(Key, #msstate { msg_locations = MsgLocations }) -> - true = ets:delete(MsgLocations, Key), - ok. +index_update_fields(Key, Updates, + #msstate { index_module = Index, index_state = State }) -> + Index:update_fields(Key, Updates, State). -index_delete_by_file(File, #msstate { msg_locations = MsgLocations }) -> - MatchHead = #msg_location { file = File, _ = '_' }, - ets:select_delete(MsgLocations, [{MatchHead, [], [true]}]), - ok. +index_delete(Key, #msstate { index_module = Index, index_state = State }) -> + Index:delete(Key, State). + +index_delete_by_file(File, #msstate { index_module = Index, + index_state = State }) -> + Index:delete_by_file(File, State). %%---------------------------------------------------------------------------- %% recovery @@ -762,7 +748,7 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) -> %% consist only of valid messages. Plan: Truncate the main file %% back to before any of the files in the tmp file and copy %% them over again - TmpPath = form_filename(Dir, TmpFileName), + TmpPath = rabbit_msg_store_misc:form_filename(Dir, TmpFileName), case is_sublist(MsgIdsTmp, MsgIds) of true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file %% note this also catches the case when the tmp file @@ -794,8 +780,8 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) -> %% are in the tmp file true = is_disjoint(MsgIds1, MsgIdsTmp), %% must open with read flag, otherwise will stomp over contents - {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName, - [read | ?WRITE_MODE]), + {ok, MainHdl} = rabbit_msg_store_misc:open_file( + Dir, NonTmpRelatedFileName, [read | ?WRITE_MODE]), %% Wipe out any rubbish at the end of the file. Remember %% the head of the list will be the highest entry in the %% file. @@ -804,8 +790,10 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) -> %% Extend the main file as big as necessary in a single %% move. If we run out of disk space, this truncate could %% fail, but we still aren't risking losing data - ok = truncate_and_extend_file(MainHdl, Top, Top + TmpSize), - {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_AHEAD_MODE), + ok = rabbit_msg_store_misc:truncate_and_extend_file( + MainHdl, Top, Top + TmpSize), + {ok, TmpHdl} = rabbit_msg_store_misc:open_file( + Dir, TmpFileName, ?READ_AHEAD_MODE), {ok, TmpSize} = file_handle_cache:copy(TmpHdl, MainHdl, TmpSize), ok = file_handle_cache:close(MainHdl), ok = file_handle_cache:delete(TmpHdl), @@ -827,22 +815,10 @@ is_disjoint(SmallerL, BiggerL) -> lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL). scan_file_for_valid_messages_msg_ids(Dir, FileName) -> - {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, FileName), + {ok, Messages, _FileSize} = + rabbit_msg_store_misc:scan_file_for_valid_messages(Dir, FileName), {ok, Messages, [MsgId || {MsgId, _TotalSize, _FileOffset} <- Messages]}. -scan_file_for_valid_messages(Dir, FileName) -> - case open_file(Dir, FileName, ?READ_MODE) of - {ok, Hdl} -> - Valid = rabbit_msg_file:scan(Hdl), - %% if something really bad's happened, the close could fail, - %% but ignore - file_handle_cache:close(Hdl), - Valid; - {error, enoent} -> {ok, [], 0}; - {error, Reason} -> throw({error, - {unable_to_scan_file, FileName, Reason}}) - end. - %% Takes the list in *ascending* order (i.e. eldest message %% first). This is the opposite of what scan_file_for_valid_messages %% produces. The list of msgs that is produced is youngest first. @@ -877,7 +853,8 @@ build_index(Left, [File|Files], sum_valid_data = SumValid, sum_file_size = SumFileSize }) -> {ok, Messages, FileSize} = - scan_file_for_valid_messages(Dir, filenum_to_name(File)), + rabbit_msg_store_misc:scan_file_for_valid_messages( + Dir, rabbit_msg_store_misc:filenum_to_name(File)), {ValidMessages, ValidTotalSize} = lists:foldl( fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> @@ -922,262 +899,93 @@ build_index(Left, [File|Files], maybe_roll_to_new_file(Offset, State = #msstate { dir = Dir, - file_size_limit = FileSizeLimit, current_file_handle = CurHdl, current_file = CurFile, file_summary = FileSummary }) - when Offset >= FileSizeLimit -> + when Offset >= ?FILE_SIZE_LIMIT -> State1 = sync(State), ok = file_handle_cache:close(CurHdl), NextFile = CurFile + 1, - {ok, NextHdl} = open_file(Dir, filenum_to_name(NextFile), ?WRITE_MODE), - true = ets:update_element(FileSummary, CurFile, - {#file_summary.right, NextFile}), + {ok, NextHdl} = rabbit_msg_store_misc:open_file( + Dir, rabbit_msg_store_misc:filenum_to_name(NextFile), + ?WRITE_MODE), true = ets:insert_new( FileSummary, #file_summary { file = NextFile, valid_total_size = 0, contiguous_top = 0, left = CurFile, right = undefined, file_size = 0, locked = false }), + true = ets:update_element(FileSummary, CurFile, + {#file_summary.right, NextFile}), State1 #msstate { current_file_handle = NextHdl, current_file = NextFile }; maybe_roll_to_new_file(_, State) -> State. +maybe_compact(State = #msstate { sum_valid_data = SumValid, + sum_file_size = SumFileSize, + file_summary = FileSummary, + gc_running = false }) + when (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION -> + First = ets:first(FileSummary), + N = random_distributions:geometric(?GEOMETRIC_P), + case find_files_to_gc(FileSummary, N, First) of + undefined -> + State; + {Source, Dest} -> + State1 = close_handle(Source, close_handle(Dest, State)), + true = ets:update_element(FileSummary, Source, + {#file_summary.locked, true}), + true = ets:update_element(FileSummary, Dest, + {#file_summary.locked, true}), + ok = rabbit_msg_store_gc:gc(Source, Dest), + State1 #msstate { gc_running = true } + end; maybe_compact(State) -> - {_Bool, State1} = maybe_compact1(State), - State1. + State. -maybe_compact1(State = #msstate { sum_valid_data = SumValid, - sum_file_size = SumFileSize, - gc_pid = undefined, - file_summary = FileSummary }) - when (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION -> - %% Pid = spawn_link(fun() -> - %% io:format("GC process!~n") - %% %% gen_server2:pcast(?SERVER, 9, {gc_finished, self(),}), - %% end), - %% State #msstate { gc_pid = Pid }; - {true, State}; -maybe_compact1(State) -> - {false, State}. - -compact(Files, State) -> - %% smallest number, hence eldest, hence left-most, first - SortedFiles = lists:sort(Files), - %% foldl reverses, so now youngest/right-most first - {RemainingFiles, State1} = - lists:foldl(fun (File, {Acc, State2}) -> - case delete_file_if_empty(File, State2) of - {true, State3} -> {Acc, State3}; - {false, State3} -> {[File | Acc], State3} - end - end, {[], State}, SortedFiles), - lists:foldl(fun combine_file/2, State1, lists:reverse(RemainingFiles)). - -%% At this stage, we simply know that the file has had msgs removed -%% from it. However, we don't know if we need to merge it left (which -%% is what we would prefer), or merge it right. If we merge left, then -%% this file is the source, and the left file is the destination. If -%% we merge right then this file is the destination and the right file -%% is the source. -combine_file(File, State = #msstate { file_summary = FileSummary, - current_file = CurFile }) -> - %% the file we're looking at may no longer exist as it may have - %% been deleted within the current GC run - case ets:lookup(FileSummary, File) of - [] -> State; - [FSEntry = #file_summary { left = Left, right = Right }] -> - GoRight = - fun() -> - case Right of - undefined -> State; - _ when not (CurFile == Right) -> - [FSRight] = ets:lookup(FileSummary, Right), - {_, State1} = adjust_meta_and_combine( - FSEntry, FSRight, State), - State1; - _ -> State - end - end, - case Left of - undefined -> - GoRight(); - _ -> [FSLeft] = ets:lookup(FileSummary, Left), - case adjust_meta_and_combine(FSLeft, FSEntry, State) of - {true, State1} -> State1; - {false, State} -> GoRight() - end - end +find_files_to_gc(_FileSummary, _N, '$end_of_table') -> + undefined; +find_files_to_gc(FileSummary, N, First) -> + [FirstObj = #file_summary { right = Right }] = + ets:lookup(FileSummary, First), + Pairs = + find_files_to_gc(FileSummary, N, FirstObj, + ets:lookup(FileSummary, Right), []), + case Pairs of + [] -> undefined; + [Pair] -> Pair; + _ -> M = 1 + (N rem length(Pairs)), + lists:nth(M, Pairs) end. -adjust_meta_and_combine( - LeftObj = #file_summary { - file = LeftFile, valid_total_size = LeftValidData, right = RightFile, - file_size = LeftFileSize, locked = true }, - RightObj = #file_summary { - file = RightFile, valid_total_size = RightValidData, left = LeftFile, - right = RightRight, file_size = RightFileSize, locked = true }, - State) -> - TotalValidData = LeftValidData + RightValidData, - {NewMsgLocs, State1} = combine_files(RightObj, LeftObj, State), - %% %% this could fail if RightRight is undefined - %% ets:update_element(FileSummary, RightRight, - %% {#file_summary.left, LeftFile}), - %% true = ets:delete(FileSummary, RightFile), - LeftObj1 = LeftObj #file_summary { - valid_total_size = TotalValidData, - contiguous_top = TotalValidData, - file_size = TotalValidData, - right = RightRight }, - {RightFile, LeftObj1, NewMsgLocs, - TotalValidData - LeftFileSize - RightFileSize, - State1}. - -combine_files(#file_summary { file = Source, - valid_total_size = SourceValid, - left = Destination }, - #file_summary { file = Destination, - valid_total_size = DestinationValid, - contiguous_top = DestinationContiguousTop, - right = Source }, - State = #gcstate { dir = Dir }) -> - State1 = close_handle(Source, close_handle(Destination, State)), - SourceName = filenum_to_name(Source), - DestinationName = filenum_to_name(Destination), - {ok, SourceHdl} = open_file(Dir, SourceName, ?READ_AHEAD_MODE), - {ok, DestinationHdl} = open_file(Dir, DestinationName, - ?READ_AHEAD_MODE ++ ?WRITE_MODE), - ExpectedSize = SourceValid + DestinationValid, - %% if DestinationValid =:= DestinationContiguousTop then we don't - %% need a tmp file - %% if they're not equal, then we need to write out everything past - %% the DestinationContiguousTop to a tmp file then truncate, - %% copy back in, and then copy over from Source - %% otherwise we just truncate straight away and copy over from Source - NewDestLocs = - if DestinationContiguousTop =:= DestinationValid -> - ok = truncate_and_extend_file(DestinationHdl, - DestinationValid, ExpectedSize), - []; - true -> - Worklist = - lists:dropwhile( - fun (#msg_location { offset = Offset }) - when Offset /= DestinationContiguousTop -> - %% it cannot be that Offset == - %% DestinationContiguousTop because if - %% it was then DestinationContiguousTop - %% would have been extended by TotalSize - Offset < DestinationContiguousTop - %% Given expected access patterns, I - %% suspect that the list should be - %% naturally sorted as we require, - %% however, we need to enforce it anyway - end, - find_unremoved_messages_in_file(Destination, State1)), - Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, - {ok, TmpHdl} = - open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), - {ok, NewDestLocs1} = - copy_messages( - Worklist, DestinationContiguousTop, DestinationValid, - DestinationHdl, TmpHdl, Destination), - TmpSize = DestinationValid - DestinationContiguousTop, - %% so now Tmp contains everything we need to salvage - %% from Destination, and NewDestLocs1 contains - %% msg_locations reflecting the compaction of - %% Destination so truncate Destination and copy from - %% Tmp back to the end - {ok, 0} = file_handle_cache:position(TmpHdl, 0), - ok = truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize), - {ok, TmpSize} = - file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), - %% position in DestinationHdl should now be DestinationValid - ok = file_handle_cache:sync(DestinationHdl), - ok = file_handle_cache:close(TmpHdl), - ok = file:delete(form_filename(Dir, Tmp)), - NewDestLocs1 - end, - SourceWorkList = find_unremoved_messages_in_file(Source, State1), - {ok, NewSourceLocs} = - copy_messages(SourceWorkList, DestinationValid, ExpectedSize, - SourceHdl, DestinationHdl, Destination), - %% tidy up - ok = file_handle_cache:close(SourceHdl), - ok = file_handle_cache:close(DestinationHdl), - ok = file:delete(form_filename(Dir, SourceName)), - {[NewDestLocs, NewSourceLocs], State1}. - -find_unremoved_messages_in_file(File, State = #gcstate { dir = Dir }) -> - %% Msgs here will be end-of-file at start-of-list - {ok, Messages, _FileSize} = - scan_file_for_valid_messages(Dir, filenum_to_name(File)), - %% foldl will reverse so will end up with msgs in ascending offset order - lists:foldl( - fun ({MsgId, _TotalSize, _Offset}, Acc) -> - case index_lookup(MsgId, State) of - Entry = #msg_location { file = File } -> [ Entry | Acc ]; - _ -> Acc - end - end, [], Messages). - -copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, - Destination) -> - {FinalOffset, BlockStart1, BlockEnd1, NewMsgLocations} = - lists:foldl( - fun (StoreEntry = #msg_location { offset = Offset, - total_size = TotalSize }, - {CurOffset, BlockStart, BlockEnd, NewMsgLocs}) -> - %% CurOffset is in the DestinationFile. - %% Offset, BlockStart and BlockEnd are in the SourceFile - %% update MsgLocation to reflect change of file and offset - NewMsgLocs1 = - [StoreEntry #msg_location { - file = Destination, - offset = CurOffset } | NewMsgLocs], - NextOffset = CurOffset + TotalSize, - {BlockStart2, BlockEnd2} = - if BlockStart =:= undefined -> - %% base case, called only for the first list elem - {Offset, Offset + TotalSize}; - Offset =:= BlockEnd -> - %% extend the current block because the - %% next msg follows straight on - {BlockStart, BlockEnd + TotalSize}; - true -> - %% found a gap, so actually do the work - %% for the previous block - BSize = BlockEnd - BlockStart, - {ok, BlockStart} = - file_handle_cache:position(SourceHdl, - BlockStart), - {ok, BSize} = file_handle_cache:copy( - SourceHdl, DestinationHdl, BSize), - {Offset, Offset + TotalSize} - end, - {NextOffset, BlockStart2, BlockEnd2, NewMsgLocs1} - end, {InitOffset, undefined, undefined, []}, WorkList), - case WorkList of - [] -> - ok; - _ -> - %% do the last remaining block - BSize1 = BlockEnd1 - BlockStart1, - {ok, BlockStart1} = - file_handle_cache:position(SourceHdl, BlockStart1), - {ok, BSize1} = - file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1), - ok = file_handle_cache:sync(DestinationHdl) - end, - {ok, NewMsgLocations}. +find_files_to_gc(_FileSummary, _N, #file_summary {}, [], Pairs) -> + lists:reverse(Pairs); +find_files_to_gc(FileSummary, N, + #file_summary { right = Source, file = Dest, + valid_total_size = DestValid }, + [SourceObj = #file_summary { left = Dest, right = SourceRight, + valid_total_size = SourceValid, + file = Source }], + Pairs) when DestValid + SourceValid =< ?FILE_SIZE_LIMIT andalso + not is_atom(SourceRight) -> + Pair = {Source, Dest}, + case N == 1 of + true -> [Pair]; + false -> find_files_to_gc(FileSummary, (N - 1), SourceObj, + ets:lookup(FileSummary, SourceRight), + [Pair | Pairs]) + end; +find_files_to_gc(FileSummary, N, _Left, + [Right = #file_summary { right = RightRight }], Pairs) -> + find_files_to_gc(FileSummary, N, Right, + ets:lookup(FileSummary, RightRight), Pairs). delete_file_if_empty(File, State = #msstate { current_file = File }) -> State; delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary, sum_file_size = SumFileSize } = State) -> [#file_summary { valid_total_size = ValidData, file_size = FileSize, - left = Left, right = Right }] = + left = Left, right = Right, locked = false }] = ets:lookup(FileSummary, File), case ValidData of %% we should NEVER find the current file in here hence right @@ -1188,16 +996,17 @@ delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary, true = ets:update_element( FileSummary, Right, {#file_summary.left, undefined}); - {_, _} when not (is_atom(Right)) -> + {_, _} when not is_atom(Right) -> true = ets:update_element(FileSummary, Right, {#file_summary.left, Left}), - true = - ets:update_element(FileSummary, Left, - {#file_summary.right, Right}) + true = ets:update_element(FileSummary, Left, + {#file_summary.right, Right}) end, true = ets:delete(FileSummary, File), State1 = close_handle(File, State), - ok = file:delete(form_filename(Dir, filenum_to_name(File))), + ok = file:delete(rabbit_msg_store_misc:form_filename( + Dir, + rabbit_msg_store_misc:filenum_to_name(File))), State1 #msstate { sum_file_size = SumFileSize - FileSize }; _ -> State end. diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl new file mode 100644 index 00000000..cb13ed86 --- /dev/null +++ b/src/rabbit_msg_store_ets_index.erl @@ -0,0 +1,71 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_msg_store_ets_index). +-export([init/0, lookup/2, insert/2, update/2, update_fields/3, delete/2, + delete_by_file/2, terminate/1]). + +-define(MSG_LOC_NAME, rabbit_msg_store_ets_index). + +-include("rabbit_msg_store.hrl"). + +init() -> + ets:new(?MSG_LOC_NAME, [set, public, {keypos, #msg_location.msg_id}]). + +lookup(Key, MsgLocations) -> + case ets:lookup(MsgLocations, Key) of + [] -> not_found; + [Entry] -> Entry + end. + +insert(Obj, MsgLocations) -> + true = ets:insert_new(MsgLocations, Obj), + ok. + +update(Obj, MsgLocations) -> + true = ets:insert(MsgLocations, Obj), + ok. + +update_fields(Key, Updates, MsgLocations) -> + true = ets:update_element(MsgLocations, Key, Updates), + ok. + +delete(Key, MsgLocations) -> + true = ets:delete(MsgLocations, Key), + ok. + +delete_by_file(File, MsgLocations) -> + MatchHead = #msg_location { file = File, _ = '_' }, + ets:select_delete(MsgLocations, [{MatchHead, [], [true]}]), + ok. + +terminate(MsgLocations) -> + ets:delete(MsgLocations). diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl new file mode 100644 index 00000000..729cd287 --- /dev/null +++ b/src/rabbit_msg_store_gc.erl @@ -0,0 +1,249 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_msg_store_gc). + +-behaviour(gen_server2). + +-export([start_link/4, gc/2, stop/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(gcstate, + {dir, + index_state, + file_summary, + index_module + }). + +-include("rabbit_msg_store.hrl"). + +-define(SERVER, ?MODULE). + +%%---------------------------------------------------------------------------- + +start_link(Dir, IndexState, FileSummary, IndexModule) -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, + [Dir, IndexState, FileSummary, IndexModule], + [{timeout, infinity}]). + +gc(Source, Destination) -> + gen_server2:cast(?SERVER, {gc, Source, Destination}). + +stop() -> + gen_server2:call(?SERVER, stop). + +%%---------------------------------------------------------------------------- + +init([Dir, IndexState, FileSummary, IndexModule]) -> + {ok, #gcstate { dir = Dir, index_state = IndexState, + file_summary = FileSummary, index_module = IndexModule }, + hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call(stop, _From, State) -> + {stop, normal, ok, State}. + +handle_cast({gc, Source, Destination}, State) -> + Reclaimed = adjust_meta_and_combine(Source, Destination, State), + ok = rabbit_msg_store:gc_done(Reclaimed, Source, Destination), + {noreply, State, hibernate}. + +handle_info(Info, State) -> + {stop, {unhandled_info, Info}, State}. + +terminate(_Reason, State) -> + State. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- + +adjust_meta_and_combine(SourceFile, DestFile, + State = #gcstate { file_summary = FileSummary }) -> + + [SourceObj = #file_summary { + valid_total_size = SourceValidData, left = DestFile, + file_size = SourceFileSize, locked = true }] = + ets:lookup(FileSummary, SourceFile), + [DestObj = #file_summary { + valid_total_size = DestValidData, right = SourceFile, + file_size = DestFileSize, locked = true }] = + ets:lookup(FileSummary, DestFile), + + TotalValidData = DestValidData + SourceValidData, + ok = combine_files(SourceObj, DestObj, State), + %% don't update dest.right, because it could be changing at the same time + true = + ets:update_element(FileSummary, DestFile, + [{#file_summary.valid_total_size, TotalValidData}, + {#file_summary.contiguous_top, TotalValidData}, + {#file_summary.file_size, TotalValidData}]), + SourceFileSize + DestFileSize - TotalValidData. + +combine_files(#file_summary { file = Source, + valid_total_size = SourceValid, + left = Destination }, + #file_summary { file = Destination, + valid_total_size = DestinationValid, + contiguous_top = DestinationContiguousTop, + right = Source }, + State = #gcstate { dir = Dir }) -> + SourceName = rabbit_msg_store_misc:filenum_to_name(Source), + DestinationName = rabbit_msg_store_misc:filenum_to_name(Destination), + {ok, SourceHdl} = + rabbit_msg_store_misc:open_file(Dir, SourceName, ?READ_AHEAD_MODE), + {ok, DestinationHdl} = + rabbit_msg_store_misc:open_file(Dir, DestinationName, + ?READ_AHEAD_MODE ++ ?WRITE_MODE), + ExpectedSize = SourceValid + DestinationValid, + %% if DestinationValid =:= DestinationContiguousTop then we don't + %% need a tmp file + %% if they're not equal, then we need to write out everything past + %% the DestinationContiguousTop to a tmp file then truncate, + %% copy back in, and then copy over from Source + %% otherwise we just truncate straight away and copy over from Source + if DestinationContiguousTop =:= DestinationValid -> + ok = rabbit_msg_store_misc:truncate_and_extend_file( + DestinationHdl, DestinationValid, ExpectedSize); + true -> + Worklist = + lists:dropwhile( + fun (#msg_location { offset = Offset }) + when Offset /= DestinationContiguousTop -> + %% it cannot be that Offset == + %% DestinationContiguousTop because if it + %% was then DestinationContiguousTop would + %% have been extended by TotalSize + Offset < DestinationContiguousTop + %% Given expected access patterns, I suspect + %% that the list should be naturally sorted + %% as we require, however, we need to + %% enforce it anyway + end, + find_unremoved_messages_in_file(Destination, State)), + Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, + {ok, TmpHdl} = rabbit_msg_store_misc:open_file( + Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), + ok = copy_messages( + Worklist, DestinationContiguousTop, DestinationValid, + DestinationHdl, TmpHdl, Destination, State), + TmpSize = DestinationValid - DestinationContiguousTop, + %% so now Tmp contains everything we need to salvage from + %% Destination, and index_state has been updated to + %% reflect the compaction of Destination so truncate + %% Destination and copy from Tmp back to the end + {ok, 0} = file_handle_cache:position(TmpHdl, 0), + ok = rabbit_msg_store_misc:truncate_and_extend_file( + DestinationHdl, DestinationContiguousTop, ExpectedSize), + {ok, TmpSize} = + file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), + %% position in DestinationHdl should now be DestinationValid + ok = file_handle_cache:sync(DestinationHdl), + ok = file_handle_cache:close(TmpHdl), + ok = file:delete(rabbit_msg_store_misc:form_filename(Dir, Tmp)) + end, + SourceWorkList = find_unremoved_messages_in_file(Source, State), + ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, + SourceHdl, DestinationHdl, Destination, State), + %% tidy up + ok = file_handle_cache:close(SourceHdl), + ok = file_handle_cache:close(DestinationHdl), + ok = file:delete(rabbit_msg_store_misc:form_filename(Dir, SourceName)), + ok. + +find_unremoved_messages_in_file(File, #gcstate { dir = Dir, + index_state = IndexState, + index_module = Index }) -> + %% Msgs here will be end-of-file at start-of-list + {ok, Messages, _FileSize} = + rabbit_msg_store_misc:scan_file_for_valid_messages( + Dir, rabbit_msg_store_misc:filenum_to_name(File)), + %% foldl will reverse so will end up with msgs in ascending offset order + lists:foldl( + fun ({MsgId, _TotalSize, _Offset}, Acc) -> + case Index:lookup(MsgId, IndexState) of + Entry = #msg_location { file = File } -> [ Entry | Acc ]; + _ -> Acc + end + end, [], Messages). + +copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, + Destination, #gcstate { index_module = Index, + index_state = IndexState }) -> + {FinalOffset, BlockStart1, BlockEnd1} = + lists:foldl( + fun (#msg_location { msg_id = MsgId, offset = Offset, + total_size = TotalSize }, + {CurOffset, BlockStart, BlockEnd}) -> + %% CurOffset is in the DestinationFile. + %% Offset, BlockStart and BlockEnd are in the SourceFile + %% update MsgLocation to reflect change of file and offset + ok = Index:update_fields(MsgId, + [{#msg_location.file, Destination}, + {#msg_location.offset, CurOffset}], + IndexState), + {BlockStart2, BlockEnd2} = + if BlockStart =:= undefined -> + %% base case, called only for the first list elem + {Offset, Offset + TotalSize}; + Offset =:= BlockEnd -> + %% extend the current block because the + %% next msg follows straight on + {BlockStart, BlockEnd + TotalSize}; + true -> + %% found a gap, so actually do the work + %% for the previous block + BSize = BlockEnd - BlockStart, + {ok, BlockStart} = + file_handle_cache:position(SourceHdl, + BlockStart), + {ok, BSize} = file_handle_cache:copy( + SourceHdl, DestinationHdl, BSize), + {Offset, Offset + TotalSize} + end, + {CurOffset + TotalSize, BlockStart2, BlockEnd2} + end, {InitOffset, undefined, undefined}, WorkList), + case WorkList of + [] -> + ok; + _ -> + %% do the last remaining block + BSize1 = BlockEnd1 - BlockStart1, + {ok, BlockStart1} = + file_handle_cache:position(SourceHdl, BlockStart1), + {ok, BSize1} = + file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1), + ok = file_handle_cache:sync(DestinationHdl) + end, + ok. diff --git a/src/rabbit_msg_store_misc.erl b/src/rabbit_msg_store_misc.erl new file mode 100644 index 00000000..cf76cf21 --- /dev/null +++ b/src/rabbit_msg_store_misc.erl @@ -0,0 +1,74 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_msg_store_misc). + +-export([open_file/3, preallocate/3, truncate_and_extend_file/3, + form_filename/2, filenum_to_name/1, scan_file_for_valid_messages/2]). + +-include("rabbit_msg_store.hrl"). + + +%%---------------------------------------------------------------------------- + +open_file(Dir, FileName, Mode) -> + file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode, + [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]). + +%%---------------------------------------------------------------------------- + +preallocate(Hdl, FileSizeLimit, FinalPos) -> + {ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit), + ok = file_handle_cache:truncate(Hdl), + {ok, FinalPos} = file_handle_cache:position(Hdl, FinalPos), + ok. + +truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> + {ok, Lowpoint} = file_handle_cache:position(FileHdl, Lowpoint), + ok = file_handle_cache:truncate(FileHdl), + ok = preallocate(FileHdl, Highpoint, Lowpoint). + +form_filename(Dir, Name) -> filename:join(Dir, Name). + +filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION. + +scan_file_for_valid_messages(Dir, FileName) -> + case open_file(Dir, FileName, ?READ_MODE) of + {ok, Hdl} -> + Valid = rabbit_msg_file:scan(Hdl), + %% if something really bad's happened, the close could fail, + %% but ignore + file_handle_cache:close(Hdl), + Valid; + {error, enoent} -> {ok, [], 0}; + {error, Reason} -> throw({error, + {unable_to_scan_file, FileName, Reason}}) + end. |