summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-09-10 03:41:03 +0100
committerMatthias Radestock <matthias@lshift.net>2009-09-10 03:41:03 +0100
commitd4af34148e2c4b2d53578719cccdd118655f6541 (patch)
tree0e39c09b55b71d4362c90eb584b5afa2a5e0e951
parent94679cd0b53f7cb9abc11fe024d728b32d4181ab (diff)
downloadrabbitmq-server-d4af34148e2c4b2d53578719cccdd118655f6541.tar.gz
drop disk_queue/msg_store disk_only mode
and all the mode switching and memory management logic that goes with it. The 2G limitition of dets make the disk_only mode not worthwhile. In the process I refactored the msg_location access in msg_store s.t. it shouldn't be much effort to plug in a different index store in the future. Also some minor tweaks and tidying up here and there.
-rw-r--r--src/rabbit_disk_queue.erl197
-rw-r--r--src/rabbit_msg_store.erl336
-rw-r--r--src/rabbit_tests.erl43
3 files changed, 119 insertions, 457 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index b7ed868b..8991939d 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -37,7 +37,6 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([handle_pre_hibernate/1]).
-export([publish/3, fetch/1, phantom_fetch/1, ack/2, tx_publish/1, tx_commit/3,
tx_rollback/1, requeue/2, purge/1, delete_queue/1,
@@ -45,10 +44,9 @@
prefetch/1
]).
--export([filesync/0, cache_info/0]).
+-export([filesync/0]).
--export([stop/0, stop_and_obliterate/0, set_mode/1, to_disk_only_mode/0,
- to_ram_disk_mode/0]).
+-export([stop/0, stop_and_obliterate/0]).
%%----------------------------------------------------------------------------
@@ -59,7 +57,6 @@
-define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences).
-define(BATCH_SIZE, 10000).
--define(DISK_ONLY_MODE_FILE, "disk_only_stats.dat").
-define(SHUTDOWN_MESSAGE_KEY, {internal_token, shutdown}).
-define(SHUTDOWN_MESSAGE,
@@ -68,7 +65,6 @@
is_delivered = never
}).
--define(MINIMUM_MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in millisecs
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
@@ -76,13 +72,10 @@
-define(SERVER, ?MODULE).
-record(dqstate,
- {operation_mode, %% ram_disk | disk_only
- store, %% message store
+ {store, %% message store
sequences, %% next read and write for each q
on_sync_txns, %% list of commiters to run on sync (reversed)
- commit_timer_ref, %% TRef for our interval timer
- memory_report_timer_ref, %% TRef for the memory report timer
- mnesia_bytes_per_record %% bytes per record in mnesia in ram_disk mode
+ commit_timer_ref %% TRef for our interval timer
}).
%%----------------------------------------------------------------------------
@@ -118,11 +111,7 @@
A, queue_name()) -> A).
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_obliterate/0 :: () -> 'ok').
--spec(to_disk_only_mode/0 :: () -> 'ok').
--spec(to_ram_disk_mode/0 :: () -> 'ok').
-spec(filesync/0 :: () -> 'ok').
--spec(cache_info/0 :: () -> [{atom(), term()}]).
--spec(set_mode/1 :: ('oppressed' | 'liberated') -> 'ok').
-endif.
@@ -187,21 +176,9 @@ stop() ->
stop_and_obliterate() ->
gen_server2:call(?SERVER, stop_vaporise, infinity).
-to_disk_only_mode() ->
- gen_server2:pcall(?SERVER, 9, to_disk_only_mode, infinity).
-
-to_ram_disk_mode() ->
- gen_server2:pcall(?SERVER, 9, to_ram_disk_mode, infinity).
-
filesync() ->
gen_server2:pcall(?SERVER, 9, filesync).
-cache_info() ->
- gen_server2:call(?SERVER, cache_info, infinity).
-
-set_mode(Mode) ->
- gen_server2:pcast(?SERVER, 10, {set_mode, Mode}).
-
%%----------------------------------------------------------------------------
%% gen_server behaviour
%%----------------------------------------------------------------------------
@@ -216,54 +193,23 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
%% brutal_kill.
%% Otherwise, the gen_server will be immediately terminated.
process_flag(trap_exit, true),
- ok = rabbit_memory_manager:register
- (self(), true, rabbit_disk_queue, set_mode, []),
- ok = filelib:ensure_dir(form_filename("nothing")),
- Node = node(),
- {Mode, MnesiaBPR, EtsBPR} =
- case lists:member(Node, mnesia:table_info(rabbit_disk_queue,
- disc_copies)) of
- true ->
- %% memory manager assumes we start oppressed. As we're
- %% not, make sure it knows about it, by reporting zero
- %% memory usage, which ensures it'll tell us to become
- %% liberated
- rabbit_memory_manager:report_memory(
- self(), 0, false),
- {ram_disk, undefined, undefined};
- false ->
- Path = form_filename(?DISK_ONLY_MODE_FILE),
- case rabbit_misc:read_term_file(Path) of
- {ok, [{MnesiaBPR1, EtsBPR1}]} ->
- {disk_only, MnesiaBPR1, EtsBPR1};
- {error, Reason} ->
- throw({error, {cannot_read_disk_only_mode_file, Path,
- Reason}})
- end
- end,
+ ok = filelib:ensure_dir(form_filename("nothing")),
ok = detect_shutdown_state_and_adjust_delivered_flags(),
- Store = rabbit_msg_store:init(Mode, base_directory(),
- FileSizeLimit, ReadFileHandlesLimit,
- fun msg_ref_gen/1, msg_ref_gen_init(),
- EtsBPR),
- Store1 = prune(Store),
+ Store = prune(rabbit_msg_store:init(base_directory(),
+ FileSizeLimit, ReadFileHandlesLimit,
+ fun msg_ref_gen/1, msg_ref_gen_init())),
Sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]),
ok = extract_sequence_numbers(Sequences),
- State =
- #dqstate { operation_mode = Mode,
- store = Store1,
- sequences = Sequences,
- on_sync_txns = [],
- commit_timer_ref = undefined,
- memory_report_timer_ref = undefined,
- mnesia_bytes_per_record = MnesiaBPR
- },
- {ok, start_memory_timer(State), hibernate,
+ State = #dqstate { store = Store,
+ sequences = Sequences,
+ on_sync_txns = [],
+ commit_timer_ref = undefined },
+ {ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call({fetch, Q}, _From, State) ->
@@ -294,25 +240,14 @@ handle_call({foldl, Fun, Init, Q}, _From, State) ->
reply(Result, State1);
handle_call(stop, _From, State) ->
{stop, normal, ok, State}; %% gen_server now calls terminate
-handle_call(stop_vaporise, _From, State = #dqstate { operation_mode = Mode }) ->
+handle_call(stop_vaporise, _From, State) ->
State1 = shutdown(State),
{atomic, ok} = mnesia:clear_table(rabbit_disk_queue),
- {atomic, ok} = case Mode of
- ram_disk -> {atomic, ok};
- disk_only -> mnesia:change_table_copy_type(
- rabbit_disk_queue, node(), disc_copies)
- end,
lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))),
{stop, normal, ok, State1}; %% gen_server now calls terminate
-handle_call(to_disk_only_mode, _From, State) ->
- reply(ok, to_disk_only_mode(State));
-handle_call(to_ram_disk_mode, _From, State) ->
- reply(ok, to_ram_disk_mode(State));
handle_call({delete_non_durable_queues, DurableQueues}, _From, State) ->
{ok, State1} = internal_delete_non_durable_queues(DurableQueues, State),
- reply(ok, State1);
-handle_call(cache_info, _From, State = #dqstate { store = Store }) ->
- reply(rabbit_msg_store:cache_info(Store), State).
+ reply(ok, State1).
handle_cast({publish, Q, Message, IsDelivered}, State) ->
{ok, _MsgSeqId, State1} = internal_publish(Q, Message, IsDelivered, State),
@@ -332,11 +267,6 @@ handle_cast({requeue, Q, MsgSeqIds}, State) ->
handle_cast({requeue_next_n, Q, N}, State) ->
{ok, State1} = internal_requeue_next_n(Q, N, State),
noreply(State1);
-handle_cast({set_mode, Mode}, State) ->
- noreply((case Mode of
- oppressed -> fun to_disk_only_mode/1;
- liberated -> fun to_ram_disk_mode/1
- end)(State));
handle_cast({prefetch, Q, From}, State) ->
{Result, State1} =
internal_fetch_body(Q, record_delivery, peek_queue, State),
@@ -352,22 +282,12 @@ handle_cast({prefetch, Q, From}, State) ->
end,
noreply(State1).
-handle_info(report_memory, State) ->
- %% call noreply1/2, not noreply/1/2, as we don't want to restart the
- %% memory_report_timer_ref.
- %% By unsetting the timer, we force a report on the next normal message
- noreply1(State #dqstate { memory_report_timer_ref = undefined });
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info(timeout, State) ->
%% must have commit_timer set, so timeout was 0, and we're not hibernating
noreply(sync(State)).
-handle_pre_hibernate(State) ->
- %% don't use noreply/1 or noreply1/1 as they'll restart the memory timer
- ok = report_memory(true, State),
- {hibernate, stop_memory_timer(State)}.
-
terminate(_Reason, State) ->
State1 = shutdown(State),
store_safe_shutdown(),
@@ -376,7 +296,7 @@ terminate(_Reason, State) ->
shutdown(State = #dqstate { sequences = undefined }) ->
State;
shutdown(State = #dqstate { sequences = Sequences, store = Store }) ->
- State1 = stop_commit_timer(stop_memory_timer(State)),
+ State1 = stop_commit_timer(State),
Store1 = rabbit_msg_store:cleanup(Store),
ets:delete(Sequences),
State1 #dqstate { sequences = undefined, store = Store1 }.
@@ -385,99 +305,18 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%----------------------------------------------------------------------------
-%% memory management helper functions
-%%----------------------------------------------------------------------------
-
-stop_memory_timer(State = #dqstate { memory_report_timer_ref = undefined }) ->
- State;
-stop_memory_timer(State = #dqstate { memory_report_timer_ref = TRef }) ->
- {ok, cancel} = timer:cancel(TRef),
- State #dqstate { memory_report_timer_ref = undefined }.
-
-start_memory_timer(State = #dqstate { memory_report_timer_ref = undefined }) ->
- ok = report_memory(false, State),
- {ok, TRef} = timer:send_after(?MINIMUM_MEMORY_REPORT_TIME_INTERVAL,
- report_memory),
- State #dqstate { memory_report_timer_ref = TRef };
-start_memory_timer(State) ->
- State.
-
-%% Scaling this by 2.5 is a magic number. Found by trial and error to
-%% work ok. We are deliberately over reporting so that we run out of
-%% memory sooner rather than later, because the transition to disk
-%% only modes transiently can take quite a lot of memory.
-report_memory(Hibernating, State) ->
- Bytes = memory_use(State),
- rabbit_memory_manager:report_memory(self(), trunc(2.5 * Bytes),
- Hibernating).
-
-memory_use(#dqstate { operation_mode = ram_disk,
- store = Store,
- sequences = Sequences }) ->
- WordSize = erlang:system_info(wordsize),
- rabbit_msg_store:memory(Store) +
- WordSize * ets:info(Sequences, memory) +
- WordSize * mnesia:table_info(rabbit_disk_queue, memory);
-memory_use(#dqstate { operation_mode = disk_only,
- store = Store,
- sequences = Sequences,
- mnesia_bytes_per_record = MnesiaBytesPerRecord }) ->
- WordSize = erlang:system_info(wordsize),
- rabbit_msg_store:memory(Store) +
- WordSize * ets:info(Sequences, memory) +
- rabbit_misc:ceil(
- mnesia:table_info(rabbit_disk_queue, size) * MnesiaBytesPerRecord).
-
-to_disk_only_mode(State = #dqstate { operation_mode = disk_only }) ->
- State;
-to_disk_only_mode(State = #dqstate { operation_mode = ram_disk,
- store = Store }) ->
- rabbit_log:info("Converting disk queue to disk only mode~n", []),
- MnesiaBPR = erlang:system_info(wordsize) *
- mnesia:table_info(rabbit_disk_queue, memory) /
- lists:max([1, mnesia:table_info(rabbit_disk_queue, size)]),
- EtsBPR = rabbit_msg_store:ets_bpr(Store),
- {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(),
- disc_only_copies),
- Store1 = rabbit_msg_store:to_disk_only_mode(Store),
- Path = form_filename(?DISK_ONLY_MODE_FILE),
- case rabbit_misc:write_term_file(Path, [{MnesiaBPR, EtsBPR}]) of
- ok -> ok;
- {error, Reason} ->
- throw({error, {cannot_create_disk_only_mode_file, Path, Reason}})
- end,
- garbage_collect(),
- State #dqstate { operation_mode = disk_only,
- store = Store1,
- mnesia_bytes_per_record = MnesiaBPR }.
-
-to_ram_disk_mode(State = #dqstate { operation_mode = ram_disk }) ->
- State;
-to_ram_disk_mode(State = #dqstate { operation_mode = disk_only,
- store = Store }) ->
- rabbit_log:info("Converting disk queue to ram disk mode~n", []),
- {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(),
- disc_copies),
- Store1 = rabbit_msg_store:to_ram_disk_mode(Store),
- ok = file:delete(form_filename(?DISK_ONLY_MODE_FILE)),
- garbage_collect(),
- State #dqstate { operation_mode = ram_disk,
- store = Store1,
- mnesia_bytes_per_record = undefined }.
-
-%%----------------------------------------------------------------------------
%% general helper functions
%%----------------------------------------------------------------------------
noreply(State) ->
- noreply1(start_memory_timer(State)).
+ noreply1(State).
noreply1(State) ->
{State1, Timeout} = next_state(State),
{noreply, State1, Timeout}.
reply(Reply, State) ->
- reply1(Reply, start_memory_timer(State)).
+ reply1(Reply, State).
reply1(Reply, State) ->
{State1, Timeout} = next_state(State),
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index b745acbf..5b7afb6c 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -31,17 +31,14 @@
-module(rabbit_msg_store).
--export([init/7, write/4, read/2, attrs/2, remove/2, release/2,
- needs_sync/2, sync/1, cleanup/1, cache_info/1, memory/1,
- ets_bpr/1, to_disk_only_mode/1, to_ram_disk_mode/1]).
+-export([init/5, write/4, read/2, attrs/2, remove/2, release/2,
+ needs_sync/2, sync/1, cleanup/1]).
%%----------------------------------------------------------------------------
-record(msstate,
- {operation_mode, %% ram_disk | disk_only
- dir, %% store directory
- msg_location_dets, %% where are messages?
- msg_location_ets, %% as above, but for ets version
+ {dir, %% store directory
+ msg_locations, %% where are messages?
file_summary, %% what's in the files?
current_file, %% current file name as number
current_file_handle, %% current file handle
@@ -51,8 +48,7 @@
file_size_limit, %% how big can our files get?
read_file_handle_cache, %% file handle cache for reading
last_sync_offset, %% current_offset at the last time we sync'd
- message_cache, %% ets message cache
- ets_bytes_per_record %% bytes per record in msg_location_ets
+ message_cache %% ets message cache
}).
-record(msg_location,
@@ -78,8 +74,6 @@
-ifdef(use_specs).
--type(mode() :: 'ram_disk' | 'disk_only').
--type(dets_table() :: any()).
-type(ets_table() :: any()).
-type(msg_id() :: binary()).
-type(msg() :: any()).
@@ -88,10 +82,8 @@
-type(io_device() :: any()).
-type(msstate() :: #msstate {
- operation_mode :: mode(),
dir :: file_path(),
- msg_location_dets :: dets_table(),
- msg_location_ets :: ets_table(),
+ msg_locations :: ets_table(),
file_summary :: ets_table(),
current_file :: non_neg_integer(),
current_file_handle :: io_device(),
@@ -100,15 +92,13 @@
file_size_limit :: non_neg_integer(),
read_file_handle_cache :: any(),
last_sync_offset :: non_neg_integer(),
- message_cache :: ets_table(),
- ets_bytes_per_record :: non_neg_integer()
+ message_cache :: ets_table()
}).
--spec(init/7 :: ('ram_disk' | 'disk_only', file_path(),
+-spec(init/5 :: (file_path(),
non_neg_integer(), non_neg_integer(),
(fun ((A) -> 'finished' | {msg_id(), non_neg_integer(), A})),
- A, non_neg_integer()) ->
- msstate()).
+ A) -> msstate()).
-spec(write/4 :: (msg_id(), msg(), msg_attrs(), msstate()) -> msstate()).
-spec(read/2 :: (msg_id(), msstate()) -> {msg(), msstate()} | 'not_found').
-spec(attrs/2 :: (msg_id(), msstate()) -> msg_attrs() | 'not_found').
@@ -117,11 +107,6 @@
-spec(needs_sync/2 :: ([msg_id()], msstate()) -> boolean()).
-spec(sync/1 :: (msstate()) -> msstate()).
-spec(cleanup/1 :: (msstate()) -> msstate()).
--spec(cache_info/1 :: (msstate()) -> [{atom(), term()}]).
--spec(memory/1 :: (msstate()) -> non_neg_integer()).
--spec(ets_bpr/1 :: (msstate()) -> non_neg_integer()).
--spec(to_disk_only_mode/1 :: (msstate()) -> msstate()).
--spec(to_ram_disk_mode/1 :: (msstate()) -> msstate()).
-endif.
@@ -129,7 +114,7 @@
%% The components:
%%
-%% MsgLocation: this is a (d)ets table which contains:
+%% MsgLocation: this is a ets table which contains:
%% {MsgId, RefCount, File, Offset, TotalSize, Attrs}
%% FileSummary: this is an ets table which contains:
%% {File, ValidTotalSize, ContiguousTop, Left, Right}
@@ -171,15 +156,7 @@
%% possibilites of a crash have occured during a compaction (this
%% consists of tidyup - the compaction is deliberately designed such
%% that data is duplicated on disk rather than risking it being lost),
-%% and rebuild the dets and ets tables (MsgLocation, FileSummary).
-%%
-%% MsgLocation is deliberately a dets table in order to ensure that we
-%% are not RAM constrained. However, for performance reasons, it is
-%% possible to call to_ram_disk_mode/0 which will convert MsgLocation
-%% to an ets table. This results in a massive performance improvement,
-%% at the expense of greater RAM usage. The idea is that when memory
-%% gets tight, we switch to disk_only mode but otherwise try to run in
-%% ram_disk mode.
+%% and rebuild the ets tables (MsgLocation, FileSummary).
%%
%% So, with this design, messages move to the left. Eventually, they
%% should end up in a contiguous block on the left and are then never
@@ -257,25 +234,11 @@
%% public API
%%----------------------------------------------------------------------------
-init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit,
- MsgRefDeltaGen, MsgRefDeltaGenInit, EtsBytesPerRecord) ->
-
- file:delete(msg_location_dets_file(Dir)),
-
- {ok, MsgLocationDets} =
- dets:open_file(?MSG_LOC_NAME,
- [{file, msg_location_dets_file(Dir)},
- {min_no_slots, 1024*1024},
- %% man says this should be <= 32M. But it works...
- {max_no_slots, 30*1024*1024},
- {type, set},
- {keypos, #msg_location.msg_id}
- ]),
+init(Dir, FileSizeLimit, ReadFileHandlesLimit,
+ MsgRefDeltaGen, MsgRefDeltaGenInit) ->
- %% it would be better to have this as private, but dets:from_ets/2
- %% seems to blow up if it is set private - see bug21489
- MsgLocationEts = ets:new(?MSG_LOC_NAME,
- [set, protected, {keypos, #msg_location.msg_id}]),
+ MsgLocations = ets:new(?MSG_LOC_NAME,
+ [set, private, {keypos, #msg_location.msg_id}]),
InitFile = 0,
HandleCache = rabbit_file_handle_cache:init(ReadFileHandlesLimit,
@@ -284,10 +247,8 @@ init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit,
[set, private, {keypos, #file_summary.file}]),
MessageCache = ets:new(?CACHE_ETS_NAME, [set, private]),
State =
- #msstate { operation_mode = Mode,
- dir = Dir,
- msg_location_dets = MsgLocationDets,
- msg_location_ets = MsgLocationEts,
+ #msstate { dir = Dir,
+ msg_locations = MsgLocations,
file_summary = FileSummary,
current_file = InitFile,
current_file_handle = undefined,
@@ -296,8 +257,7 @@ init(Mode, Dir, FileSizeLimit, ReadFileHandlesLimit,
file_size_limit = FileSizeLimit,
read_file_handle_cache = HandleCache,
last_sync_offset = 0,
- message_cache = MessageCache,
- ets_bytes_per_record = EtsBytesPerRecord
+ message_cache = MessageCache
},
ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
@@ -324,15 +284,14 @@ write(MsgId, Msg, Attrs,
current_file = CurFile,
current_offset = CurOffset,
file_summary = FileSummary }) ->
- case dets_ets_lookup(State, MsgId) of
- [] ->
+ case index_lookup(MsgId, State) of
+ not_found ->
%% New message, lots to do
{ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg, Attrs),
- true = dets_ets_insert_new(
- State, #msg_location {
- msg_id = MsgId, ref_count = 1, file = CurFile,
- offset = CurOffset, total_size = TotalSize,
- attrs = Attrs }),
+ ok = index_insert(#msg_location {
+ msg_id = MsgId, ref_count = 1, file = CurFile,
+ offset = CurOffset, total_size = TotalSize,
+ attrs = Attrs }, State),
[FSEntry = #file_summary { valid_total_size = ValidTotalSize,
contiguous_top = ContiguousTop,
right = undefined }] =
@@ -350,23 +309,20 @@ write(MsgId, Msg, Attrs,
maybe_roll_to_new_file(
NextOffset, State #msstate {current_offset = NextOffset,
current_dirty = true});
- [StoreEntry =
- #msg_location { msg_id = MsgId, ref_count = RefCount }] ->
+ StoreEntry = #msg_location { ref_count = RefCount } ->
%% We already know about it, just update counter
- ok = dets_ets_insert(State, StoreEntry #msg_location {
- ref_count = RefCount + 1 }),
+ ok = index_update(StoreEntry #msg_location {
+ ref_count = RefCount + 1 }, State),
State
end.
read(MsgId, State) ->
- Objs = dets_ets_lookup(State, MsgId),
- case Objs of
- [] ->
- not_found;
- [#msg_location { ref_count = RefCount,
- file = File,
- offset = Offset,
- total_size = TotalSize }] ->
+ case index_lookup(MsgId, State) of
+ not_found -> not_found;
+ #msg_location { ref_count = RefCount,
+ file = File,
+ offset = Offset,
+ total_size = TotalSize } ->
case fetch_and_increment_cache(MsgId, State) of
not_found ->
{{ok, {MsgId, Msg, _Attrs}}, State1} =
@@ -401,10 +357,9 @@ read(MsgId, State) ->
end.
attrs(MsgId, State) ->
- Objs = dets_ets_lookup(State, MsgId),
- case Objs of
- [] -> not_found;
- [#msg_location { msg_id = MsgId, attrs = Attrs }] -> Attrs
+ case index_lookup(MsgId, State) of
+ not_found -> not_found;
+ #msg_location { attrs = Attrs } -> Attrs
end.
remove(MsgIds, State = #msstate { current_file = CurFile }) ->
@@ -430,9 +385,8 @@ needs_sync(_MsgIds, #msstate { current_dirty = false }) ->
needs_sync(MsgIds, State = #msstate { current_file = CurFile,
last_sync_offset = SyncOffset }) ->
lists:any(fun (MsgId) ->
- [#msg_location { msg_id = MsgId, file = File,
- offset = Offset }] =
- dets_ets_lookup(State, MsgId),
+ #msg_location { file = File, offset = Offset } =
+ index_lookup(MsgId, State),
File =:= CurFile andalso Offset >= SyncOffset
end, MsgIds).
@@ -443,9 +397,7 @@ sync(State = #msstate { current_file_handle = CurHdl,
ok = file:sync(CurHdl),
State #msstate { current_dirty = false, last_sync_offset = CurOffset }.
-cleanup(State = #msstate { dir = Dir,
- msg_location_dets = MsgLocationDets,
- msg_location_ets = MsgLocationEts,
+cleanup(State = #msstate { msg_locations = MsgLocations,
file_summary = FileSummary,
current_file_handle = FileHdl,
read_file_handle_cache = HC }) ->
@@ -456,66 +408,15 @@ cleanup(State = #msstate { dir = Dir,
State2
end,
HC1 = rabbit_file_handle_cache:close_all(HC),
- dets:close(MsgLocationDets),
- file:delete(msg_location_dets_file(Dir)),
- ets:delete(MsgLocationEts),
+ ets:delete(MsgLocations),
ets:delete(FileSummary),
- State1 #msstate { msg_location_dets = undefined,
- msg_location_ets = undefined,
+ State1 #msstate { msg_locations = undefined,
file_summary = undefined,
current_file_handle = undefined,
current_dirty = false,
read_file_handle_cache = HC1
}.
-cache_info(#msstate { message_cache = Cache }) ->
- ets:info(Cache).
-
-memory(#msstate { operation_mode = ram_disk,
- file_summary = FileSummary,
- msg_location_ets = MsgLocationEts,
- message_cache = Cache }) ->
- erlang:system_info(wordsize) *
- lists:sum([ets:info(Table, memory) ||
- Table <- [FileSummary, MsgLocationEts, Cache]]);
-memory(#msstate { operation_mode = disk_only,
- file_summary = FileSummary,
- msg_location_dets = MsgLocationDets,
- message_cache = Cache,
- ets_bytes_per_record = EtsBytesPerRecord }) ->
- erlang:system_info(wordsize) *
- lists:sum([ets:info(Table, memory) ||
- Table <- [FileSummary, Cache]]) +
- rabbit_misc:ceil(dets:info(MsgLocationDets, size) * EtsBytesPerRecord).
-
-ets_bpr(#msstate { operation_mode = disk_only,
- ets_bytes_per_record = EtsBytesPerRecord }) ->
- EtsBytesPerRecord;
-ets_bpr(#msstate { operation_mode = ram_disk,
- msg_location_ets = MsgLocationEts }) ->
- erlang:system_info(wordsize) * ets:info(MsgLocationEts, memory) /
- lists:max([1, ets:info(MsgLocationEts, size)]).
-
-to_disk_only_mode(State = #msstate { operation_mode = disk_only }) ->
- State;
-to_disk_only_mode(State = #msstate { operation_mode = ram_disk,
- msg_location_dets = MsgLocationDets,
- msg_location_ets = MsgLocationEts }) ->
- ok = dets:from_ets(MsgLocationDets, MsgLocationEts),
- true = ets:delete_all_objects(MsgLocationEts),
- State #msstate { operation_mode = disk_only,
- ets_bytes_per_record = ets_bpr(State) }.
-
-to_ram_disk_mode(State = #msstate { operation_mode = ram_disk }) ->
- State;
-to_ram_disk_mode(State = #msstate { operation_mode = disk_only,
- msg_location_dets = MsgLocationDets,
- msg_location_ets = MsgLocationEts }) ->
- true = ets:from_dets(MsgLocationEts, MsgLocationDets),
- ok = dets:delete_all_objects(MsgLocationDets),
- State #msstate { operation_mode = ram_disk,
- ets_bytes_per_record = undefined }.
-
%%----------------------------------------------------------------------------
%% general helper functions
%%----------------------------------------------------------------------------
@@ -526,9 +427,6 @@ filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)).
-msg_location_dets_file(Dir) ->
- form_filename(Dir, atom_to_list(?MSG_LOC_NAME) ++ ?FILE_EXTENSION_DETS).
-
open_file(Dir, FileName, Mode) ->
file:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode).
@@ -563,13 +461,12 @@ with_read_handle_at(File, Offset, Fun,
{Result, State1 #msstate { read_file_handle_cache = HC1 }}.
remove_message(MsgId, State = #msstate { file_summary = FileSummary }) ->
- [StoreEntry =
- #msg_location { msg_id = MsgId, ref_count = RefCount, file = File,
- offset = Offset, total_size = TotalSize }] =
- dets_ets_lookup(State, MsgId),
+ StoreEntry = #msg_location { ref_count = RefCount, file = File,
+ offset = Offset, total_size = TotalSize } =
+ index_lookup(MsgId, State),
case RefCount of
1 ->
- ok = dets_ets_delete(State, MsgId),
+ ok = index_delete(MsgId, State),
ok = remove_cache_entry(MsgId, State),
[FSEntry = #file_summary { valid_total_size = ValidTotalSize,
contiguous_top = ContiguousTop }] =
@@ -582,8 +479,8 @@ remove_message(MsgId, State = #msstate { file_summary = FileSummary }) ->
{compact, File};
_ when 1 < RefCount ->
ok = decrement_cache(MsgId, State),
- ok = dets_ets_insert(State, StoreEntry #msg_location {
- ref_count = RefCount - 1 }),
+ ok = index_update(StoreEntry #msg_location {
+ ref_count = RefCount - 1 }, State),
no_compact
end.
@@ -628,52 +525,39 @@ cache_is_full(Cache) ->
ets:info(Cache, memory) > ?CACHE_MAX_SIZE.
%%----------------------------------------------------------------------------
-%% dets/ets agnosticism
+%% index
%%----------------------------------------------------------------------------
-dets_ets_lookup(#msstate { msg_location_dets = MsgLocationDets,
- operation_mode = disk_only }, Key) ->
- dets:lookup(MsgLocationDets, Key);
-dets_ets_lookup(#msstate { msg_location_ets = MsgLocationEts,
- operation_mode = ram_disk }, Key) ->
- ets:lookup(MsgLocationEts, Key).
-
-dets_ets_delete(#msstate { msg_location_dets = MsgLocationDets,
- operation_mode = disk_only }, Key) ->
- ok = dets:delete(MsgLocationDets, Key);
-dets_ets_delete(#msstate { msg_location_ets = MsgLocationEts,
- operation_mode = ram_disk }, Key) ->
- true = ets:delete(MsgLocationEts, Key),
+index_lookup(Key, #msstate { msg_locations = MsgLocations }) ->
+ case ets:lookup(MsgLocations, Key) of
+ [] -> not_found;
+ [Entry] -> Entry
+ end.
+
+index_insert(Obj, #msstate { msg_locations = MsgLocations }) ->
+ true = ets:insert_new(MsgLocations, Obj),
ok.
-dets_ets_insert(#msstate { msg_location_dets = MsgLocationDets,
- operation_mode = disk_only }, Obj) ->
- ok = dets:insert(MsgLocationDets, Obj);
-dets_ets_insert(#msstate { msg_location_ets = MsgLocationEts,
- operation_mode = ram_disk }, Obj) ->
- true = ets:insert(MsgLocationEts, Obj),
+index_update(Obj, #msstate { msg_locations = MsgLocations }) ->
+ true = ets:insert(MsgLocations, Obj),
ok.
-dets_ets_insert_new(#msstate { msg_location_dets = MsgLocationDets,
- operation_mode = disk_only }, Obj) ->
- true = dets:insert_new(MsgLocationDets, Obj);
-dets_ets_insert_new(#msstate { msg_location_ets = MsgLocationEts,
- operation_mode = ram_disk }, Obj) ->
- true = ets:insert_new(MsgLocationEts, Obj).
-
-dets_ets_match_object(#msstate { msg_location_dets = MsgLocationDets,
- operation_mode = disk_only }, Obj) ->
- dets:match_object(MsgLocationDets, Obj);
-dets_ets_match_object(#msstate { msg_location_ets = MsgLocationEts,
- operation_mode = ram_disk }, Obj) ->
- ets:match_object(MsgLocationEts, Obj).
-
-dets_ets_select_delete(#msstate { msg_location_dets = MsgLocationDets,
- operation_mode = disk_only }, MatchSpec) ->
- dets:select_delete(MsgLocationDets, MatchSpec);
-dets_ets_select_delete(#msstate { msg_location_ets = MsgLocationEts,
- operation_mode = ram_disk }, MatchSpec) ->
- ets:select_delete(MsgLocationEts, MatchSpec).
+index_delete(Key, #msstate { msg_locations = MsgLocations }) ->
+ true = ets:delete(MsgLocations, Key),
+ ok.
+
+index_search_by_file(File, #msstate { msg_locations = MsgLocations }) ->
+ lists:sort(fun (#msg_location { offset = OffA },
+ #msg_location { offset = OffB }) ->
+ OffA < OffB
+ end, ets:match_object(MsgLocations,
+ #msg_location { file = File, _ = '_' })).
+
+
+index_delete_by_file(File, #msstate { msg_locations = MsgLocations }) ->
+ MatchHead = #msg_location { file = File, _ = '_' },
+ ets:select_delete(MsgLocations, [{MatchHead, [], [true]}]),
+ ok.
%%----------------------------------------------------------------------------
%% recovery
@@ -684,33 +568,27 @@ count_msg_refs(Gen, Seed, State) ->
finished -> ok;
{_MsgId, 0, Next} -> count_msg_refs(Gen, Next, State);
{MsgId, Delta, Next} ->
- case dets_ets_lookup(State, MsgId) of
- [] -> true = dets_ets_insert_new(
- State, #msg_location { msg_id = MsgId,
- ref_count = Delta });
- [StoreEntry = #msg_location { msg_id = MsgId,
- ref_count = RefCount }] ->
+ case index_lookup(MsgId, State) of
+ not_found ->
+ ok = index_insert(#msg_location { msg_id = MsgId,
+ ref_count = Delta },
+ State);
+ StoreEntry = #msg_location { ref_count = RefCount } ->
NewRefCount = RefCount + Delta,
case NewRefCount of
- 0 -> ok = dets_ets_delete(State, MsgId);
- _ -> ok = dets_ets_insert(
- State, StoreEntry #msg_location {
- ref_count = NewRefCount })
+ 0 -> ok = index_delete(MsgId, State);
+ _ -> ok = index_update(StoreEntry #msg_location {
+ ref_count = NewRefCount },
+ State)
end
end,
count_msg_refs(Gen, Next, State)
end.
verify_messages_referenced(State, MsgIds) ->
- lists:foreach(fun (MsgId) -> [_] = dets_ets_lookup(State, MsgId) end,
- MsgIds).
-
-prune_stale_refs(State) ->
- MatchHead = #msg_location { file = undefined, _ = '_' },
- case dets_ets_select_delete(State, [{MatchHead, [], [true]}]) of
- N when is_number(N) -> ok;
- Other -> Other
- end.
+ lists:foreach(fun (MsgId) ->
+ #msg_location {} = index_lookup(MsgId, State)
+ end, MsgIds).
recover_crashed_compactions(Dir, FileNames, TmpFileNames, State) ->
lists:foreach(fun (TmpFileName) ->
@@ -865,9 +743,9 @@ load_messages(Files, State) ->
load_messages(undefined, Files, State).
load_messages(Left, [], State) ->
- ok = prune_stale_refs(State),
+ ok = index_delete_by_file(undefined, State),
Offset =
- case sort_msg_locations_by_offset(desc, Left, State) of
+ case lists:reverse(index_search_by_file(Left, State)) of
[] -> 0;
[#msg_location { offset = MaxOffset,
total_size = TotalSize } | _] ->
@@ -879,14 +757,13 @@ load_messages(Left, [File|Files],
{ok, Messages} = scan_file_for_valid_messages(Dir, filenum_to_name(File)),
{ValidMessages, ValidTotalSize} = lists:foldl(
fun (Obj = {MsgId, Attrs, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
- case dets_ets_lookup(State, MsgId) of
- [] -> {VMAcc, VTSAcc};
- [StoreEntry] ->
- ok = dets_ets_insert(
- State, StoreEntry #msg_location {
- file = File, offset = Offset,
- total_size = TotalSize,
- attrs = Attrs }),
+ case index_lookup(MsgId, State) of
+ not_found -> {VMAcc, VTSAcc};
+ StoreEntry ->
+ ok = index_update(StoreEntry #msg_location {
+ file = File, offset = Offset,
+ total_size = TotalSize,
+ attrs = Attrs }, State),
{[Obj | VMAcc], VTSAcc + TotalSize}
end
end, {[], 0}, Messages),
@@ -1004,17 +881,6 @@ adjust_meta_and_combine(
true -> {false, State}
end.
-sort_msg_locations_by_offset(Dir, File, State) ->
- Comp = case Dir of
- asc -> fun erlang:'<'/2;
- desc -> fun erlang:'>'/2
- end,
- lists:sort(fun (#msg_location { offset = OffA },
- #msg_location { offset = OffB }) ->
- Comp(OffA, OffB)
- end, dets_ets_match_object(
- State, #msg_location { file = File, _ = '_' })).
-
combine_files(#file_summary { file = Source,
valid_total_size = SourceValid,
left = Destination },
@@ -1055,7 +921,7 @@ combine_files(#file_summary { file = Source,
%% that the list should be naturally sorted
%% as we require, however, we need to
%% enforce it anyway
- end, sort_msg_locations_by_offset(asc, Destination, State1)),
+ end, index_search_by_file(Destination, State1)),
ok = copy_messages(
Worklist, DestinationContiguousTop, DestinationValid,
DestinationHdl, TmpHdl, Destination, State1),
@@ -1073,7 +939,7 @@ combine_files(#file_summary { file = Source,
ok = file:close(TmpHdl),
ok = file:delete(form_filename(Dir, Tmp))
end,
- SourceWorkList = sort_msg_locations_by_offset(asc, Source, State1),
+ SourceWorkList = index_search_by_file(Source, State1),
ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
SourceHdl, DestinationHdl, Destination, State1),
%% tidy up
@@ -1092,9 +958,9 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
%% CurOffset is in the DestinationFile.
%% Offset, BlockStart and BlockEnd are in the SourceFile
%% update MsgLocationDets to reflect change of file and offset
- ok = dets_ets_insert(State, StoreEntry #msg_location {
- file = Destination,
- offset = CurOffset }),
+ ok = index_update(StoreEntry #msg_location {
+ file = Destination,
+ offset = CurOffset }, State),
NextOffset = CurOffset + TotalSize,
if BlockStart =:= undefined ->
%% base case, called only for the first list elem
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1f2187bc..1e50696a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -828,7 +828,6 @@ test_disk_queue() ->
passed = rdq_test_purge(),
passed = rdq_test_mixed_queue_modes(),
passed = rdq_test_mode_conversion_mid_txn(),
- passed = rdq_test_disk_queue_modes(),
rdq_virgin(),
passed.
@@ -1266,47 +1265,6 @@ rdq_tx_publish_mixed_alter_commit_get(MS0, MsgsA, MsgsB, Mode, CommitOrCancel) -
0 = rabbit_mixed_queue:len(MS11),
passed.
-rdq_test_disk_queue_modes() ->
- rdq_virgin(),
- rdq_start(),
- Msg = <<0:(8*256)>>,
- Total = 1000,
- Half1 = lists:seq(1,round(Total/2)),
- Half2 = lists:seq(1 + round(Total/2), Total),
- CommitHalf1 = commit_list(Half1, round(Total/2)),
- CommitHalf2 = commit_list(Half2, Total - round(Total/2)),
- [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- Half1],
- ok = rabbit_disk_queue:tx_commit(q, CommitHalf1, []),
- io:format("Publish done~n", []),
- ok = rabbit_disk_queue:to_disk_only_mode(),
- io:format("To Disk Only done~n", []),
- [rabbit_disk_queue:tx_publish(rdq_message(N, Msg, false)) || N <- Half2],
- ok = rabbit_disk_queue:tx_commit(q, CommitHalf2, []),
- Seqs = [begin
- Remaining = Total - N,
- {Message, false, SeqId, Remaining} =
- rabbit_disk_queue:fetch(q),
- ok = rdq_match_message(Message, N, Msg, 256),
- SeqId
- end || N <- Half1],
- io:format("Deliver first half done~n", []),
- ok = rabbit_disk_queue:to_ram_disk_mode(),
- io:format("To RAM Disk done~n", []),
- Seqs2 = [begin
- Remaining = Total - N,
- {Message, false, SeqId, Remaining} =
- rabbit_disk_queue:fetch(q),
- ok = rdq_match_message(Message, N, Msg, 256),
- SeqId
- end || N <- Half2],
- io:format("Deliver second half done~n", []),
- ok = rabbit_disk_queue:tx_commit(q, [], Seqs),
- ok = rabbit_disk_queue:to_disk_only_mode(),
- ok = rabbit_disk_queue:tx_commit(q, [], Seqs2),
- empty = rabbit_disk_queue:fetch(q),
- rdq_stop(),
- passed.
-
rdq_time_commands(Funcs) ->
lists:foreach(fun (F) -> F() end, Funcs).
@@ -1319,7 +1277,6 @@ rdq_virgin() ->
rdq_start() ->
{ok, _} = rabbit_disk_queue:start_link(),
- ok = rabbit_disk_queue:to_ram_disk_mode(),
ok.
rdq_stop() ->