diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-09-10 03:41:03 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-09-10 03:41:03 +0100 |
commit | d4af34148e2c4b2d53578719cccdd118655f6541 (patch) | |
tree | 0e39c09b55b71d4362c90eb584b5afa2a5e0e951 | |
parent | 94679cd0b53f7cb9abc11fe024d728b32d4181ab (diff) | |
download | rabbitmq-server-d4af34148e2c4b2d53578719cccdd118655f6541.tar.gz |
drop disk_queue/msg_store disk_only mode
and all the mode switching and memory management logic that goes with it.
The 2G limitition of dets make the disk_only mode not worthwhile.
In the process I refactored the msg_location access in msg_store
s.t. it shouldn't be much effort to plug in a different index store in
the future.
Also some minor tweaks and tidying up here and there.
-rw-r--r-- | src/rabbit_disk_queue.erl | 197 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 336 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 43 |
3 files changed, 119 insertions, 457 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index b7ed868b..8991939d 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -37,7 +37,6 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([handle_pre_hibernate/1]). -export([publish/3, fetch/1, phantom_fetch/1, ack/2, tx_publish/1, tx_commit/3, tx_rollback/1, requeue/2, purge/1, delete_queue/1, @@ -45,10 +44,9 @@ prefetch/1 ]). --export([filesync/0, cache_info/0]). +-export([filesync/0]). --export([stop/0, stop_and_obliterate/0, set_mode/1, to_disk_only_mode/0, - to_ram_disk_mode/0]). +-export([stop/0, stop_and_obliterate/0]). %%---------------------------------------------------------------------------- @@ -59,7 +57,6 @@ -define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences). -define(BATCH_SIZE, 10000). --define(DISK_ONLY_MODE_FILE, "disk_only_stats.dat"). -define(SHUTDOWN_MESSAGE_KEY, {internal_token, shutdown}). -define(SHUTDOWN_MESSAGE, @@ -68,7 +65,6 @@ is_delivered = never }). --define(MINIMUM_MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in millisecs -define(SYNC_INTERVAL, 5). %% milliseconds -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). @@ -76,13 +72,10 @@ -define(SERVER, ?MODULE). -record(dqstate, - {operation_mode, %% ram_disk | disk_only - store, %% message store + {store, %% message store sequences, %% next read and write for each q on_sync_txns, %% list of commiters to run on sync (reversed) - commit_timer_ref, %% TRef for our interval timer - memory_report_timer_ref, %% TRef for the memory report timer - mnesia_bytes_per_record %% bytes per record in mnesia in ram_disk mode + commit_timer_ref %% TRef for our interval timer }). %%---------------------------------------------------------------------------- @@ -118,11 +111,7 @@ A, queue_name()) -> A). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). --spec(to_disk_only_mode/0 :: () -> 'ok'). --spec(to_ram_disk_mode/0 :: () -> 'ok'). -spec(filesync/0 :: () -> 'ok'). --spec(cache_info/0 :: () -> [{atom(), term()}]). --spec(set_mode/1 :: ('oppressed' | 'liberated') -> 'ok'). -endif. @@ -187,21 +176,9 @@ stop() -> stop_and_obliterate() -> gen_server2:call(?SERVER, stop_vaporise, infinity). -to_disk_only_mode() -> - gen_server2:pcall(?SERVER, 9, to_disk_only_mode, infinity). - -to_ram_disk_mode() -> - gen_server2:pcall(?SERVER, 9, to_ram_disk_mode, infinity). - filesync() -> gen_server2:pcall(?SERVER, 9, filesync). -cache_info() -> - gen_server2:call(?SERVER, cache_info, infinity). - -set_mode(Mode) -> - gen_server2:pcast(?SERVER, 10, {set_mode, Mode}). - %%---------------------------------------------------------------------------- %% gen_server behaviour %%---------------------------------------------------------------------------- @@ -216,54 +193,23 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> %% brutal_kill. %% Otherwise, the gen_server will be immediately terminated. process_flag(trap_exit, true), - ok = rabbit_memory_manager:register - (self(), true, rabbit_disk_queue, set_mode, []), - ok = filelib:ensure_dir(form_filename("nothing")), - Node = node(), - {Mode, MnesiaBPR, EtsBPR} = - case lists:member(Node, mnesia:table_info(rabbit_disk_queue, - disc_copies)) of - true -> - %% memory manager assumes we start oppressed. As we're - %% not, make sure it knows about it, by reporting zero - %% memory usage, which ensures it'll tell us to become - %% liberated - rabbit_memory_manager:report_memory( - self(), 0, false), - {ram_disk, undefined, undefined}; - false -> - Path = form_filename(?DISK_ONLY_MODE_FILE), - case rabbit_misc:read_term_file(Path) of - {ok, [{MnesiaBPR1, EtsBPR1}]} -> - {disk_only, MnesiaBPR1, EtsBPR1}; - {error, Reason} -> - throw({error, {cannot_read_disk_only_mode_file, Path, - Reason}}) - end - end, + ok = filelib:ensure_dir(form_filename("nothing")), ok = detect_shutdown_state_and_adjust_delivered_flags(), - Store = rabbit_msg_store:init(Mode, base_directory(), - FileSizeLimit, ReadFileHandlesLimit, - fun msg_ref_gen/1, msg_ref_gen_init(), - EtsBPR), - Store1 = prune(Store), + Store = prune(rabbit_msg_store:init(base_directory(), + FileSizeLimit, ReadFileHandlesLimit, + fun msg_ref_gen/1, msg_ref_gen_init())), Sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]), ok = extract_sequence_numbers(Sequences), - State = - #dqstate { operation_mode = Mode, - store = Store1, - sequences = Sequences, - on_sync_txns = [], - commit_timer_ref = undefined, - memory_report_timer_ref = undefined, - mnesia_bytes_per_record = MnesiaBPR - }, - {ok, start_memory_timer(State), hibernate, + State = #dqstate { store = Store, + sequences = Sequences, + on_sync_txns = [], + commit_timer_ref = undefined }, + {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call({fetch, Q}, _From, State) -> @@ -294,25 +240,14 @@ handle_call({foldl, Fun, Init, Q}, _From, State) -> reply(Result, State1); handle_call(stop, _From, State) -> {stop, normal, ok, State}; %% gen_server now calls terminate -handle_call(stop_vaporise, _From, State = #dqstate { operation_mode = Mode }) -> +handle_call(stop_vaporise, _From, State) -> State1 = shutdown(State), {atomic, ok} = mnesia:clear_table(rabbit_disk_queue), - {atomic, ok} = case Mode of - ram_disk -> {atomic, ok}; - disk_only -> mnesia:change_table_copy_type( - rabbit_disk_queue, node(), disc_copies) - end, lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))), {stop, normal, ok, State1}; %% gen_server now calls terminate -handle_call(to_disk_only_mode, _From, State) -> - reply(ok, to_disk_only_mode(State)); -handle_call(to_ram_disk_mode, _From, State) -> - reply(ok, to_ram_disk_mode(State)); handle_call({delete_non_durable_queues, DurableQueues}, _From, State) -> {ok, State1} = internal_delete_non_durable_queues(DurableQueues, State), - reply(ok, State1); -handle_call(cache_info, _From, State = #dqstate { store = Store }) -> - reply(rabbit_msg_store:cache_info(Store), State). + reply(ok, State1). handle_cast({publish, Q, Message, IsDelivered}, State) -> {ok, _MsgSeqId, State1} = internal_publish(Q, Message, IsDelivered, State), @@ -332,11 +267,6 @@ handle_cast({requeue, Q, MsgSeqIds}, State) -> handle_cast({requeue_next_n, Q, N}, State) -> {ok, State1} = internal_requeue_next_n(Q, N, State), noreply(State1); -handle_cast({set_mode, Mode}, State) -> - noreply((case Mode of - oppressed -> fun to_disk_only_mode/1; - liberated -> fun to_ram_disk_mode/1 - end)(State)); handle_cast({prefetch, Q, From}, State) -> {Result, State1} = internal_fetch_body(Q, record_delivery, peek_queue, State), @@ -352,22 +282,12 @@ handle_cast({prefetch, Q, From}, State) -> end, noreply(State1). -handle_info(report_memory, State) -> - %% call noreply1/2, not noreply/1/2, as we don't want to restart the - %% memory_report_timer_ref. - %% By unsetting the timer, we force a report on the next normal message - noreply1(State #dqstate { memory_report_timer_ref = undefined }); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info(timeout, State) -> %% must have commit_timer set, so timeout was 0, and we're not hibernating noreply(sync(State)). -handle_pre_hibernate(State) -> - %% don't use noreply/1 or noreply1/1 as they'll restart the memory timer - ok = report_memory(true, State), - {hibernate, stop_memory_timer(State)}. - terminate(_Reason, State) -> State1 = shutdown(State), store_safe_shutdown(), @@ -376,7 +296,7 @@ terminate(_Reason, State) -> shutdown(State = #dqstate { sequences = undefined }) -> State; shutdown(State = #dqstate { sequences = Sequences, store = Store }) -> - State1 = stop_commit_timer(stop_memory_timer(State)), + State1 = stop_commit_timer(State), Store1 = rabbit_msg_store:cleanup(Store), ets:delete(Sequences), State1 #dqstate { sequences = undefined, store = Store1 }. @@ -385,99 +305,18 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. %%---------------------------------------------------------------------------- -%% memory management helper functions -%%---------------------------------------------------------------------------- - -stop_memory_timer(State = #dqstate { memory_report_timer_ref = undefined }) -> - State; -stop_memory_timer(State = #dqstate { memory_report_timer_ref = TRef }) -> - {ok, cancel} = timer:cancel(TRef), - State #dqstate { memory_report_timer_ref = undefined }. - -start_memory_timer(State = #dqstate { memory_report_timer_ref = undefined }) -> - ok = report_memory(false, State), - {ok, TRef} = timer:send_after(?MINIMUM_MEMORY_REPORT_TIME_INTERVAL, - report_memory), - State #dqstate { memory_report_timer_ref = TRef }; -start_memory_timer(State) -> - State. - -%% Scaling this by 2.5 is a magic number. Found by trial and error to -%% work ok. We are deliberately over reporting so that we run out of -%% memory sooner rather than later, because the transition to disk -%% only modes transiently can take quite a lot of memory. -report_memory(Hibernating, State) -> - Bytes = memory_use(State), - rabbit_memory_manager:report_memory(self(), trunc(2.5 * Bytes), - Hibernating). - -memory_use(#dqstate { operation_mode = ram_disk, - store = Store, - sequences = Sequences }) -> - WordSize = erlang:system_info(wordsize), - rabbit_msg_store:memory(Store) + - WordSize * ets:info(Sequences, memory) + - WordSize * mnesia:table_info(rabbit_disk_queue, memory); -memory_use(#dqstate { operation_mode = disk_only, - store = Store, - sequences = Sequences, - mnesia_bytes_per_record = MnesiaBytesPerRecord }) -> - WordSize = erlang:system_info(wordsize), - rabbit_msg_store:memory(Store) + - WordSize * ets:info(Sequences, memory) + - rabbit_misc:ceil( - mnesia:table_info(rabbit_disk_queue, size) * MnesiaBytesPerRecord). - -to_disk_only_mode(State = #dqstate { operation_mode = disk_only }) -> - State; -to_disk_only_mode(State = #dqstate { operation_mode = ram_disk, - store = Store }) -> - rabbit_log:info("Converting disk queue to disk only mode~n", []), - MnesiaBPR = erlang:system_info(wordsize) * - mnesia:table_info(rabbit_disk_queue, memory) / - lists:max([1, mnesia:table_info(rabbit_disk_queue, size)]), - EtsBPR = rabbit_msg_store:ets_bpr(Store), - {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), - disc_only_copies), - Store1 = rabbit_msg_store:to_disk_only_mode(Store), - Path = form_filename(?DISK_ONLY_MODE_FILE), - case rabbit_misc:write_term_file(Path, [{MnesiaBPR, EtsBPR}]) of - ok -> ok; - {error, Reason} -> - throw({error, {cannot_create_disk_only_mode_file, Path, Reason}}) - end, - garbage_collect(), - State #dqstate { operation_mode = disk_only, - store = Store1, - mnesia_bytes_per_record = MnesiaBPR }. - -to_ram_disk_mode(State = #dqstate { operation_mode = ram_disk }) -> - State; -to_ram_disk_mode(State = #dqstate { operation_mode = disk_only, - store = Store }) -> - rabbit_log:info("Converting disk queue to ram disk mode~n", []), - {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), - disc_copies), - Store1 = rabbit_msg_store:to_ram_disk_mode(Store), - ok = file:delete(form_filename(?DISK_ONLY_MODE_FILE)), - garbage_collect(), - State #dqstate { operation_mode = ram_disk, - store = Store1, - mnesia_bytes_per_record = undefined }. - -%%---------------------------------------------------------------------------- %% general helper functions %%---------------------------------------------------------------------------- noreply(State) -> - noreply1(start_memory_timer(State)). + noreply1(State). noreply1(State) -> {State1, Timeout} = next_state(State), {noreply, State1, Timeout}. reply(Reply, State) -> - reply1(Reply, start_memory_timer(State)). + reply1(Reply, State). reply1(Reply, State) -> {State1, Timeout} = next_state(State), diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index b745acbf..5b7afb6c 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -31,17 +31,14 @@ -module(rabbit_msg_store). --export([init/7, write/4, read/2, attrs/2, remove/2, release/2, - needs_sync/2, sync/1, cleanup/1, cache_info/1, memory/1, - ets_bpr/1, to_disk_only_mode/1, to_ram_disk_mode/1]). +-export([init/5, write/4, read/2, attrs/2, remove/2, release/2, + needs_sync/2, sync/1, cleanup/1]). %%---------------------------------------------------------------------------- -record(msstate, - {operation_mode, %% ram_disk | disk_only - dir, %% store directory - msg_location_dets, %% where are messages? - msg_location_ets, %% as above, but for ets version + {dir, %% store directory + msg_locations, %% where are messages? file_summary, %% what's in the files? current_file, %% current file name as number current_file_handle, %% current file handle @@ -51,8 +48,7 @@ file_size_limit, %% how big can our files get? read_file_handle_cache, %% file handle cache for reading last_sync_offset, %% current_offset at the last time we sync'd - message_cache, %% ets message cache - ets_bytes_per_record %% bytes per record in msg_location_ets + message_cache %% ets message cache }). -record(msg_location, @@ -78,8 +74,6 @@ -ifdef(use_specs). --type(mode() :: 'ram_disk' | 'disk_only'). --type(dets_table() :: any()). -type(ets_table() :: any()). -type(msg_id() :: binary()). -type(msg() :: any()). @@ -88,10 +82,8 @@ -type(io_device() :: any()). -type(msstate() :: #msstate { - operation_mode :: mode(), dir :: file_path(), - msg_location_dets :: dets_table(), - msg_location_ets :: ets_table(), + msg_locations :: ets_table(), file_summary :: ets_table(), current_file :: non_neg_integer(), current_file_handle :: io_device(), @@ -100,15 +92,13 @@ file_size_limit :: non_neg_integer(), read_file_handle_cache :: any(), last_sync_offset :: non_neg_integer(), - message_cache :: ets_table(), - ets_bytes_per_record :: non_neg_integer() + message_cache :: ets_table() }). --spec(init/7 :: ('ram_disk' | 'disk_only', file_path(), +-spec(init/5 :: (file_path(), non_neg_integer(), non_neg_integer(), (fun ((A) -> 'finished' | {msg_id(), non_neg_integer(), A})), - A, non_neg_integer()) -> - msstate()). + A) -> msstate()). -spec(write/4 :: (msg_id(), msg(), msg_attrs(), msstate()) -> msstate()). -spec(read/2 :: (msg_id(), msstate()) -> {msg(), msstate()} | 'not_found'). -spec(attrs/2 :: (msg_id(), msstate()) -> msg_attrs() | 'not_found'). @@ -117,11 +107,6 @@ -spec(needs_sync/2 :: ([msg_id()], msstate()) -> boolean()). -spec(sync/1 :: (msstate()) -> msstate()). -spec(cleanup/1 :: (msstate()) -> msstate()). --spec(cache_info/1 :: (msstate()) -> [{atom(), term()}]). --spec(memory/1 :: (msstate()) -> non_neg_integer()). --spec(ets_bpr/1 :: (msstate()) -> non_neg_integer()). --spec(to_disk_only_mode/1 :: (msstate()) -> msstate()). --spec(to_ram_disk_mode/1 :: (msstate()) -> msstate()). -endif. @@ -129,7 +114,7 @@ %% The components: %% -%% MsgLocation: this is a (d)ets table which contains: +%% MsgLocation: this is a ets table which contains: %% {MsgId, RefCount, File, Offset, TotalSize, Attrs} %% FileSummary: this is an ets table which contains: %% {File, ValidTotalSize, ContiguousTop, Left, Right} @@ -171,15 +156,7 @@ %% possibilites of a crash have occured during a compaction (this %% consists of tidyup - the compaction is deliberately designed such %% that data is duplicated on disk rather than risking it being lost), -%% and rebuild the dets and ets tables (MsgLocation, FileSummary). -%% -%% MsgLocation is deliberately a dets table in order to ensure that we -%% are not RAM constrained. However, for performance reasons, it is -%% possible to call to_ram_disk_mode/0 which will convert MsgLocation -%% to an ets table. This results in a massive performance improvement, -%% at the expense of greater RAM usage. The idea is that when memory -%% gets tight, we switch to disk_only mode but otherwise try to run in -%% ram_disk mode. +%% and rebuild the ets tables (MsgLocation, FileSummary). %% %% So, with this design, messages move to the left. Eventually, they %% should end up in a contiguous block on the left and are then never @@ -257,25 +234,11 @@ %% public API %%---------------------------------------------------------------------------- -init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit, - MsgRefDeltaGen, MsgRefDeltaGenInit, EtsBytesPerRecord) -> - - file:delete(msg_location_dets_file(Dir)), - - {ok, MsgLocationDets} = - dets:open_file(?MSG_LOC_NAME, - [{file, msg_location_dets_file(Dir)}, - {min_no_slots, 1024*1024}, - %% man says this should be <= 32M. But it works... - {max_no_slots, 30*1024*1024}, - {type, set}, - {keypos, #msg_location.msg_id} - ]), +init(Dir, FileSizeLimit, ReadFileHandlesLimit, + MsgRefDeltaGen, MsgRefDeltaGenInit) -> - %% it would be better to have this as private, but dets:from_ets/2 - %% seems to blow up if it is set private - see bug21489 - MsgLocationEts = ets:new(?MSG_LOC_NAME, - [set, protected, {keypos, #msg_location.msg_id}]), + MsgLocations = ets:new(?MSG_LOC_NAME, + [set, private, {keypos, #msg_location.msg_id}]), InitFile = 0, HandleCache = rabbit_file_handle_cache:init(ReadFileHandlesLimit, @@ -284,10 +247,8 @@ init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit, [set, private, {keypos, #file_summary.file}]), MessageCache = ets:new(?CACHE_ETS_NAME, [set, private]), State = - #msstate { operation_mode = Mode, - dir = Dir, - msg_location_dets = MsgLocationDets, - msg_location_ets = MsgLocationEts, + #msstate { dir = Dir, + msg_locations = MsgLocations, file_summary = FileSummary, current_file = InitFile, current_file_handle = undefined, @@ -296,8 +257,7 @@ init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit, file_size_limit = FileSizeLimit, read_file_handle_cache = HandleCache, last_sync_offset = 0, - message_cache = MessageCache, - ets_bytes_per_record = EtsBytesPerRecord + message_cache = MessageCache }, ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State), @@ -324,15 +284,14 @@ write(MsgId, Msg, Attrs, current_file = CurFile, current_offset = CurOffset, file_summary = FileSummary }) -> - case dets_ets_lookup(State, MsgId) of - [] -> + case index_lookup(MsgId, State) of + not_found -> %% New message, lots to do {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg, Attrs), - true = dets_ets_insert_new( - State, #msg_location { - msg_id = MsgId, ref_count = 1, file = CurFile, - offset = CurOffset, total_size = TotalSize, - attrs = Attrs }), + ok = index_insert(#msg_location { + msg_id = MsgId, ref_count = 1, file = CurFile, + offset = CurOffset, total_size = TotalSize, + attrs = Attrs }, State), [FSEntry = #file_summary { valid_total_size = ValidTotalSize, contiguous_top = ContiguousTop, right = undefined }] = @@ -350,23 +309,20 @@ write(MsgId, Msg, Attrs, maybe_roll_to_new_file( NextOffset, State #msstate {current_offset = NextOffset, current_dirty = true}); - [StoreEntry = - #msg_location { msg_id = MsgId, ref_count = RefCount }] -> + StoreEntry = #msg_location { ref_count = RefCount } -> %% We already know about it, just update counter - ok = dets_ets_insert(State, StoreEntry #msg_location { - ref_count = RefCount + 1 }), + ok = index_update(StoreEntry #msg_location { + ref_count = RefCount + 1 }, State), State end. read(MsgId, State) -> - Objs = dets_ets_lookup(State, MsgId), - case Objs of - [] -> - not_found; - [#msg_location { ref_count = RefCount, - file = File, - offset = Offset, - total_size = TotalSize }] -> + case index_lookup(MsgId, State) of + not_found -> not_found; + #msg_location { ref_count = RefCount, + file = File, + offset = Offset, + total_size = TotalSize } -> case fetch_and_increment_cache(MsgId, State) of not_found -> {{ok, {MsgId, Msg, _Attrs}}, State1} = @@ -401,10 +357,9 @@ read(MsgId, State) -> end. attrs(MsgId, State) -> - Objs = dets_ets_lookup(State, MsgId), - case Objs of - [] -> not_found; - [#msg_location { msg_id = MsgId, attrs = Attrs }] -> Attrs + case index_lookup(MsgId, State) of + not_found -> not_found; + #msg_location { attrs = Attrs } -> Attrs end. remove(MsgIds, State = #msstate { current_file = CurFile }) -> @@ -430,9 +385,8 @@ needs_sync(_MsgIds, #msstate { current_dirty = false }) -> needs_sync(MsgIds, State = #msstate { current_file = CurFile, last_sync_offset = SyncOffset }) -> lists:any(fun (MsgId) -> - [#msg_location { msg_id = MsgId, file = File, - offset = Offset }] = - dets_ets_lookup(State, MsgId), + #msg_location { file = File, offset = Offset } = + index_lookup(MsgId, State), File =:= CurFile andalso Offset >= SyncOffset end, MsgIds). @@ -443,9 +397,7 @@ sync(State = #msstate { current_file_handle = CurHdl, ok = file:sync(CurHdl), State #msstate { current_dirty = false, last_sync_offset = CurOffset }. -cleanup(State = #msstate { dir = Dir, - msg_location_dets = MsgLocationDets, - msg_location_ets = MsgLocationEts, +cleanup(State = #msstate { msg_locations = MsgLocations, file_summary = FileSummary, current_file_handle = FileHdl, read_file_handle_cache = HC }) -> @@ -456,66 +408,15 @@ cleanup(State = #msstate { dir = Dir, State2 end, HC1 = rabbit_file_handle_cache:close_all(HC), - dets:close(MsgLocationDets), - file:delete(msg_location_dets_file(Dir)), - ets:delete(MsgLocationEts), + ets:delete(MsgLocations), ets:delete(FileSummary), - State1 #msstate { msg_location_dets = undefined, - msg_location_ets = undefined, + State1 #msstate { msg_locations = undefined, file_summary = undefined, current_file_handle = undefined, current_dirty = false, read_file_handle_cache = HC1 }. -cache_info(#msstate { message_cache = Cache }) -> - ets:info(Cache). - -memory(#msstate { operation_mode = ram_disk, - file_summary = FileSummary, - msg_location_ets = MsgLocationEts, - message_cache = Cache }) -> - erlang:system_info(wordsize) * - lists:sum([ets:info(Table, memory) || - Table <- [FileSummary, MsgLocationEts, Cache]]); -memory(#msstate { operation_mode = disk_only, - file_summary = FileSummary, - msg_location_dets = MsgLocationDets, - message_cache = Cache, - ets_bytes_per_record = EtsBytesPerRecord }) -> - erlang:system_info(wordsize) * - lists:sum([ets:info(Table, memory) || - Table <- [FileSummary, Cache]]) + - rabbit_misc:ceil(dets:info(MsgLocationDets, size) * EtsBytesPerRecord). - -ets_bpr(#msstate { operation_mode = disk_only, - ets_bytes_per_record = EtsBytesPerRecord }) -> - EtsBytesPerRecord; -ets_bpr(#msstate { operation_mode = ram_disk, - msg_location_ets = MsgLocationEts }) -> - erlang:system_info(wordsize) * ets:info(MsgLocationEts, memory) / - lists:max([1, ets:info(MsgLocationEts, size)]). - -to_disk_only_mode(State = #msstate { operation_mode = disk_only }) -> - State; -to_disk_only_mode(State = #msstate { operation_mode = ram_disk, - msg_location_dets = MsgLocationDets, - msg_location_ets = MsgLocationEts }) -> - ok = dets:from_ets(MsgLocationDets, MsgLocationEts), - true = ets:delete_all_objects(MsgLocationEts), - State #msstate { operation_mode = disk_only, - ets_bytes_per_record = ets_bpr(State) }. - -to_ram_disk_mode(State = #msstate { operation_mode = ram_disk }) -> - State; -to_ram_disk_mode(State = #msstate { operation_mode = disk_only, - msg_location_dets = MsgLocationDets, - msg_location_ets = MsgLocationEts }) -> - true = ets:from_dets(MsgLocationEts, MsgLocationDets), - ok = dets:delete_all_objects(MsgLocationDets), - State #msstate { operation_mode = ram_disk, - ets_bytes_per_record = undefined }. - %%---------------------------------------------------------------------------- %% general helper functions %%---------------------------------------------------------------------------- @@ -526,9 +427,6 @@ filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION. filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)). -msg_location_dets_file(Dir) -> - form_filename(Dir, atom_to_list(?MSG_LOC_NAME) ++ ?FILE_EXTENSION_DETS). - open_file(Dir, FileName, Mode) -> file:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode). @@ -563,13 +461,12 @@ with_read_handle_at(File, Offset, Fun, {Result, State1 #msstate { read_file_handle_cache = HC1 }}. remove_message(MsgId, State = #msstate { file_summary = FileSummary }) -> - [StoreEntry = - #msg_location { msg_id = MsgId, ref_count = RefCount, file = File, - offset = Offset, total_size = TotalSize }] = - dets_ets_lookup(State, MsgId), + StoreEntry = #msg_location { ref_count = RefCount, file = File, + offset = Offset, total_size = TotalSize } = + index_lookup(MsgId, State), case RefCount of 1 -> - ok = dets_ets_delete(State, MsgId), + ok = index_delete(MsgId, State), ok = remove_cache_entry(MsgId, State), [FSEntry = #file_summary { valid_total_size = ValidTotalSize, contiguous_top = ContiguousTop }] = @@ -582,8 +479,8 @@ remove_message(MsgId, State = #msstate { file_summary = FileSummary }) -> {compact, File}; _ when 1 < RefCount -> ok = decrement_cache(MsgId, State), - ok = dets_ets_insert(State, StoreEntry #msg_location { - ref_count = RefCount - 1 }), + ok = index_update(StoreEntry #msg_location { + ref_count = RefCount - 1 }, State), no_compact end. @@ -628,52 +525,39 @@ cache_is_full(Cache) -> ets:info(Cache, memory) > ?CACHE_MAX_SIZE. %%---------------------------------------------------------------------------- -%% dets/ets agnosticism +%% index %%---------------------------------------------------------------------------- -dets_ets_lookup(#msstate { msg_location_dets = MsgLocationDets, - operation_mode = disk_only }, Key) -> - dets:lookup(MsgLocationDets, Key); -dets_ets_lookup(#msstate { msg_location_ets = MsgLocationEts, - operation_mode = ram_disk }, Key) -> - ets:lookup(MsgLocationEts, Key). - -dets_ets_delete(#msstate { msg_location_dets = MsgLocationDets, - operation_mode = disk_only }, Key) -> - ok = dets:delete(MsgLocationDets, Key); -dets_ets_delete(#msstate { msg_location_ets = MsgLocationEts, - operation_mode = ram_disk }, Key) -> - true = ets:delete(MsgLocationEts, Key), +index_lookup(Key, #msstate { msg_locations = MsgLocations }) -> + case ets:lookup(MsgLocations, Key) of + [] -> not_found; + [Entry] -> Entry + end. + +index_insert(Obj, #msstate { msg_locations = MsgLocations }) -> + true = ets:insert_new(MsgLocations, Obj), ok. -dets_ets_insert(#msstate { msg_location_dets = MsgLocationDets, - operation_mode = disk_only }, Obj) -> - ok = dets:insert(MsgLocationDets, Obj); -dets_ets_insert(#msstate { msg_location_ets = MsgLocationEts, - operation_mode = ram_disk }, Obj) -> - true = ets:insert(MsgLocationEts, Obj), +index_update(Obj, #msstate { msg_locations = MsgLocations }) -> + true = ets:insert(MsgLocations, Obj), ok. -dets_ets_insert_new(#msstate { msg_location_dets = MsgLocationDets, - operation_mode = disk_only }, Obj) -> - true = dets:insert_new(MsgLocationDets, Obj); -dets_ets_insert_new(#msstate { msg_location_ets = MsgLocationEts, - operation_mode = ram_disk }, Obj) -> - true = ets:insert_new(MsgLocationEts, Obj). - -dets_ets_match_object(#msstate { msg_location_dets = MsgLocationDets, - operation_mode = disk_only }, Obj) -> - dets:match_object(MsgLocationDets, Obj); -dets_ets_match_object(#msstate { msg_location_ets = MsgLocationEts, - operation_mode = ram_disk }, Obj) -> - ets:match_object(MsgLocationEts, Obj). - -dets_ets_select_delete(#msstate { msg_location_dets = MsgLocationDets, - operation_mode = disk_only }, MatchSpec) -> - dets:select_delete(MsgLocationDets, MatchSpec); -dets_ets_select_delete(#msstate { msg_location_ets = MsgLocationEts, - operation_mode = ram_disk }, MatchSpec) -> - ets:select_delete(MsgLocationEts, MatchSpec). +index_delete(Key, #msstate { msg_locations = MsgLocations }) -> + true = ets:delete(MsgLocations, Key), + ok. + +index_search_by_file(File, #msstate { msg_locations = MsgLocations }) -> + lists:sort(fun (#msg_location { offset = OffA }, + #msg_location { offset = OffB }) -> + OffA < OffB + end, ets:match_object(MsgLocations, + #msg_location { file = File, _ = '_' })). + + +index_delete_by_file(File, #msstate { msg_locations = MsgLocations }) -> + MatchHead = #msg_location { file = File, _ = '_' }, + ets:select_delete(MsgLocations, [{MatchHead, [], [true]}]), + ok. %%---------------------------------------------------------------------------- %% recovery @@ -684,33 +568,27 @@ count_msg_refs(Gen, Seed, State) -> finished -> ok; {_MsgId, 0, Next} -> count_msg_refs(Gen, Next, State); {MsgId, Delta, Next} -> - case dets_ets_lookup(State, MsgId) of - [] -> true = dets_ets_insert_new( - State, #msg_location { msg_id = MsgId, - ref_count = Delta }); - [StoreEntry = #msg_location { msg_id = MsgId, - ref_count = RefCount }] -> + case index_lookup(MsgId, State) of + not_found -> + ok = index_insert(#msg_location { msg_id = MsgId, + ref_count = Delta }, + State); + StoreEntry = #msg_location { ref_count = RefCount } -> NewRefCount = RefCount + Delta, case NewRefCount of - 0 -> ok = dets_ets_delete(State, MsgId); - _ -> ok = dets_ets_insert( - State, StoreEntry #msg_location { - ref_count = NewRefCount }) + 0 -> ok = index_delete(MsgId, State); + _ -> ok = index_update(StoreEntry #msg_location { + ref_count = NewRefCount }, + State) end end, count_msg_refs(Gen, Next, State) end. verify_messages_referenced(State, MsgIds) -> - lists:foreach(fun (MsgId) -> [_] = dets_ets_lookup(State, MsgId) end, - MsgIds). - -prune_stale_refs(State) -> - MatchHead = #msg_location { file = undefined, _ = '_' }, - case dets_ets_select_delete(State, [{MatchHead, [], [true]}]) of - N when is_number(N) -> ok; - Other -> Other - end. + lists:foreach(fun (MsgId) -> + #msg_location {} = index_lookup(MsgId, State) + end, MsgIds). recover_crashed_compactions(Dir, FileNames, TmpFileNames, State) -> lists:foreach(fun (TmpFileName) -> @@ -865,9 +743,9 @@ load_messages(Files, State) -> load_messages(undefined, Files, State). load_messages(Left, [], State) -> - ok = prune_stale_refs(State), + ok = index_delete_by_file(undefined, State), Offset = - case sort_msg_locations_by_offset(desc, Left, State) of + case lists:reverse(index_search_by_file(Left, State)) of [] -> 0; [#msg_location { offset = MaxOffset, total_size = TotalSize } | _] -> @@ -879,14 +757,13 @@ load_messages(Left, [File|Files], {ok, Messages} = scan_file_for_valid_messages(Dir, filenum_to_name(File)), {ValidMessages, ValidTotalSize} = lists:foldl( fun (Obj = {MsgId, Attrs, TotalSize, Offset}, {VMAcc, VTSAcc}) -> - case dets_ets_lookup(State, MsgId) of - [] -> {VMAcc, VTSAcc}; - [StoreEntry] -> - ok = dets_ets_insert( - State, StoreEntry #msg_location { - file = File, offset = Offset, - total_size = TotalSize, - attrs = Attrs }), + case index_lookup(MsgId, State) of + not_found -> {VMAcc, VTSAcc}; + StoreEntry -> + ok = index_update(StoreEntry #msg_location { + file = File, offset = Offset, + total_size = TotalSize, + attrs = Attrs }, State), {[Obj | VMAcc], VTSAcc + TotalSize} end end, {[], 0}, Messages), @@ -1004,17 +881,6 @@ adjust_meta_and_combine( true -> {false, State} end. -sort_msg_locations_by_offset(Dir, File, State) -> - Comp = case Dir of - asc -> fun erlang:'<'/2; - desc -> fun erlang:'>'/2 - end, - lists:sort(fun (#msg_location { offset = OffA }, - #msg_location { offset = OffB }) -> - Comp(OffA, OffB) - end, dets_ets_match_object( - State, #msg_location { file = File, _ = '_' })). - combine_files(#file_summary { file = Source, valid_total_size = SourceValid, left = Destination }, @@ -1055,7 +921,7 @@ combine_files(#file_summary { file = Source, %% that the list should be naturally sorted %% as we require, however, we need to %% enforce it anyway - end, sort_msg_locations_by_offset(asc, Destination, State1)), + end, index_search_by_file(Destination, State1)), ok = copy_messages( Worklist, DestinationContiguousTop, DestinationValid, DestinationHdl, TmpHdl, Destination, State1), @@ -1073,7 +939,7 @@ combine_files(#file_summary { file = Source, ok = file:close(TmpHdl), ok = file:delete(form_filename(Dir, Tmp)) end, - SourceWorkList = sort_msg_locations_by_offset(asc, Source, State1), + SourceWorkList = index_search_by_file(Source, State1), ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, SourceHdl, DestinationHdl, Destination, State1), %% tidy up @@ -1092,9 +958,9 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, %% CurOffset is in the DestinationFile. %% Offset, BlockStart and BlockEnd are in the SourceFile %% update MsgLocationDets to reflect change of file and offset - ok = dets_ets_insert(State, StoreEntry #msg_location { - file = Destination, - offset = CurOffset }), + ok = index_update(StoreEntry #msg_location { + file = Destination, + offset = CurOffset }, State), NextOffset = CurOffset + TotalSize, if BlockStart =:= undefined -> %% base case, called only for the first list elem diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 1f2187bc..1e50696a 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -828,7 +828,6 @@ test_disk_queue() -> passed = rdq_test_purge(), passed = rdq_test_mixed_queue_modes(), passed = rdq_test_mode_conversion_mid_txn(), - passed = rdq_test_disk_queue_modes(), rdq_virgin(), passed. @@ -1266,47 +1265,6 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) - 0 = rabbit_mixed_queue:len(MS11), passed. -rdq_test_disk_queue_modes() -> - rdq_virgin(), - rdq_start(), - Msg = <<0:(8*256)>>, - Total = 1000, - Half1 = lists:seq(1,round(Total/2)), - Half2 = lists:seq(1 + round(Total/2), Total), - CommitHalf1 = commit_list(Half1, round(Total/2)), - CommitHalf2 = commit_list(Half2, Total - round(Total/2)), - [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- Half1], - ok = rabbit_disk_queue:tx_commit(q, CommitHalf1, []), - io:format("Publish done~n", []), - ok = rabbit_disk_queue:to_disk_only_mode(), - io:format("To Disk Only done~n", []), - [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- Half2], - ok = rabbit_disk_queue:tx_commit(q, CommitHalf2, []), - Seqs = [begin - Remaining = Total - N, - {Message, false, SeqId, Remaining} = - rabbit_disk_queue:fetch(q), - ok = rdq_match_message(Message, N, Msg, 256), - SeqId - end || N <- Half1], - io:format("Deliver first half done~n", []), - ok = rabbit_disk_queue:to_ram_disk_mode(), - io:format("To RAM Disk done~n", []), - Seqs2 = [begin - Remaining = Total - N, - {Message, false, SeqId, Remaining} = - rabbit_disk_queue:fetch(q), - ok = rdq_match_message(Message, N, Msg, 256), - SeqId - end || N <- Half2], - io:format("Deliver second half done~n", []), - ok = rabbit_disk_queue:tx_commit(q, [], Seqs), - ok = rabbit_disk_queue:to_disk_only_mode(), - ok = rabbit_disk_queue:tx_commit(q, [], Seqs2), - empty = rabbit_disk_queue:fetch(q), - rdq_stop(), - passed. - rdq_time_commands(Funcs) -> lists:foreach(fun (F) -> F() end, Funcs). @@ -1319,7 +1277,6 @@ rdq_virgin() -> rdq_start() -> {ok, _} = rabbit_disk_queue:start_link(), - ok = rabbit_disk_queue:to_ram_disk_mode(), ok. rdq_stop() -> |