summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-09-25 14:49:54 +0100
committerMatthias Radestock <matthias@lshift.net>2009-09-25 14:49:54 +0100
commitfa65a1266274ad41b1addf15c83112ee33d6f69d (patch)
tree52a8b978d1b6056d350bc11bee654dccef635b23
parent577a216cc49da650fd0f42a64a684a7e66bede30 (diff)
downloadrabbitmq-server-fa65a1266274ad41b1addf15c83112ee33d6f69d.tar.gz
turn the msg_store into a separate process
-rw-r--r--src/rabbit_disk_queue.erl96
-rw-r--r--src/rabbit_msg_store.erl285
2 files changed, 200 insertions, 181 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 6fe2a4b3..02a8ed8c 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -70,8 +70,7 @@
-define(SERVER, ?MODULE).
-record(dqstate,
- {store, %% message store
- sequences, %% next read and write for each q
+ {sequences, %% next read and write for each q
on_sync_txns, %% list of commiters to run on sync (reversed)
commit_timer_ref %% TRef for our interval timer
}).
@@ -196,15 +195,15 @@ init([]) ->
ok = detect_shutdown_state_and_adjust_delivered_flags(),
- Store = rabbit_msg_store:init(base_directory(),
- fun msg_ref_gen/1, msg_ref_gen_init()),
- ok = prune(Store),
+ {ok, _Pid} = rabbit_msg_store:start_link(base_directory(),
+ fun msg_ref_gen/1,
+ msg_ref_gen_init()),
+ ok = prune(),
Sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]),
ok = extract_sequence_numbers(Sequences),
- State = #dqstate { store = Store,
- sequences = Sequences,
+ State = #dqstate { sequences = Sequences,
on_sync_txns = [],
commit_timer_ref = undefined },
{ok, State, hibernate,
@@ -291,11 +290,11 @@ terminate(_Reason, State) ->
shutdown(State = #dqstate { sequences = undefined }) ->
State;
-shutdown(State = #dqstate { sequences = Sequences, store = Store }) ->
+shutdown(State = #dqstate { sequences = Sequences }) ->
State1 = stop_commit_timer(State),
- Store1 = rabbit_msg_store:cleanup(Store),
+ ok = rabbit_msg_store:stop(),
ets:delete(Sequences),
- State1 #dqstate { sequences = undefined, store = Store1 }.
+ State1 #dqstate { sequences = undefined }.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -350,12 +349,12 @@ stop_commit_timer(State = #dqstate { commit_timer_ref = TRef }) ->
{ok, cancel} = timer:cancel(TRef),
State #dqstate { commit_timer_ref = undefined }.
-sync(State = #dqstate { store = Store, on_sync_txns = Txns }) ->
- State1 = State #dqstate { store = rabbit_msg_store:sync(Store) },
+sync(State = #dqstate { on_sync_txns = Txns }) ->
+ ok = rabbit_msg_store:sync(),
case Txns of
- [] -> State1;
+ [] -> State;
_ -> lists:foldl(fun internal_do_tx_commit/2,
- State1 #dqstate { on_sync_txns = [] },
+ State #dqstate { on_sync_txns = [] },
lists:reverse(Txns))
end.
@@ -363,13 +362,12 @@ sync(State = #dqstate { store = Store, on_sync_txns = Txns }) ->
%% internal functions
%%----------------------------------------------------------------------------
-internal_fetch_body(Q, Advance, State = #dqstate { store = Store }) ->
+internal_fetch_body(Q, Advance, State) ->
case next(Q, record_delivery, Advance, State) of
empty -> {empty, State};
{MsgId, IsDelivered, AckTag, Remaining} ->
- {Message, Store1} = rabbit_msg_store:read(MsgId, Store),
- State1 = State #dqstate { store = Store1 },
- {{Message, IsDelivered, AckTag, Remaining}, State1}
+ {ok, Message} = rabbit_msg_store:read(MsgId),
+ {{Message, IsDelivered, AckTag, Remaining}, State}
end.
internal_fetch_attributes(Q, MarkDelivered, State) ->
@@ -413,41 +411,37 @@ internal_foldl(Q, Fun, Init, State) ->
internal_foldl(_Q, SeqId, _Fun, State, Acc, SeqId) ->
{ok, Acc, State};
-internal_foldl(Q, WriteSeqId, Fun, State = #dqstate { store = Store },
- Acc, ReadSeqId) ->
+internal_foldl(Q, WriteSeqId, Fun, State, Acc, ReadSeqId) ->
[#dq_msg_loc {is_delivered = IsDelivered, msg_id = MsgId}] =
mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}),
- {Message, Store1} = rabbit_msg_store:read(MsgId, Store),
+ {ok, Message} = rabbit_msg_store:read(MsgId),
Acc1 = Fun(Message, {MsgId, ReadSeqId}, IsDelivered, Acc),
- internal_foldl(Q, WriteSeqId, Fun, State #dqstate { store = Store1 },
- Acc1, ReadSeqId + 1).
+ internal_foldl(Q, WriteSeqId, Fun, State, Acc1, ReadSeqId + 1).
internal_ack(Q, MsgSeqIds, State) ->
remove_messages(Q, MsgSeqIds, State).
-remove_messages(Q, MsgSeqIds, State = #dqstate { store = Store } ) ->
+remove_messages(Q, MsgSeqIds, State) ->
MsgIds = lists:foldl(
fun ({MsgId, SeqId}, MsgIdAcc) ->
ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}),
[MsgId | MsgIdAcc]
end, [], MsgSeqIds),
- Store1 = rabbit_msg_store:remove(MsgIds, Store),
- {ok, State #dqstate { store = Store1 }}.
+ ok = rabbit_msg_store:remove(MsgIds),
+ {ok, State}.
internal_tx_publish(Message = #basic_message { guid = MsgId,
- content = Content },
- State = #dqstate { store = Store }) ->
+ content = Content }, State) ->
ClearedContent = rabbit_binary_parser:clear_decoded_content(Content),
- Message1 = Message #basic_message { content = ClearedContent },
- Store1 = rabbit_msg_store:write(MsgId, Message1, Store),
- {ok, State #dqstate { store = Store1 }}.
+ ok = rabbit_msg_store:write(
+ MsgId, Message #basic_message { content = ClearedContent }),
+ {ok, State}.
internal_tx_commit(Q, PubMsgIds, AckSeqIds, From,
- State = #dqstate { store = Store, on_sync_txns = Txns }) ->
+ State = #dqstate { on_sync_txns = Txns }) ->
TxnDetails = {Q, PubMsgIds, AckSeqIds, From},
case rabbit_msg_store:needs_sync(
- [MsgId || {MsgId, _IsDelivered, _IsPersistent} <- PubMsgIds],
- Store) of
+ [MsgId || {MsgId, _IsDelivered, _IsPersistent} <- PubMsgIds]) of
true -> Txns1 = [TxnDetails | Txns],
State #dqstate { on_sync_txns = Txns1 };
false -> internal_do_tx_commit(TxnDetails, State)
@@ -496,14 +490,13 @@ internal_publish(Q, Message = #basic_message { guid = MsgId,
true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId + 1}),
{ok, {MsgId, WriteSeqId}, State1}.
-internal_tx_rollback(MsgIds, State = #dqstate { store = Store }) ->
- Store1 = rabbit_msg_store:remove(MsgIds, Store),
- {ok, State #dqstate { store = Store1 }}.
+internal_tx_rollback(MsgIds, State) ->
+ ok = rabbit_msg_store:remove(MsgIds),
+ {ok, State}.
internal_requeue(_Q, [], State) ->
{ok, State};
-internal_requeue(Q, MsgSeqIds, State = #dqstate { store = Store,
- sequences = Sequences }) ->
+internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) ->
%% We know that every seq_id in here is less than the ReadSeqId
%% you'll get if you look up this queue in Sequences (i.e. they've
%% already been delivered). We also know that the rows for these
@@ -536,8 +529,8 @@ internal_requeue(Q, MsgSeqIds, State = #dqstate { store = Store,
MsgSeqIds)
end),
true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId1}),
- Store1 = rabbit_msg_store:release(MsgIds, Store),
- {ok, State #dqstate { store = Store1 }}.
+ ok = rabbit_msg_store:release(MsgIds),
+ {ok, State}.
requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, Acc}) ->
[Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] =
@@ -551,8 +544,7 @@ requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, Acc}) ->
{WriteSeqId + 1, Q, [MsgId | Acc]}.
%% move the next N messages from the front of the queue to the back.
-internal_requeue_next_n(Q, N, State = #dqstate { store = Store,
- sequences = Sequences }) ->
+internal_requeue_next_n(Q, N, State = #dqstate { sequences = Sequences }) ->
{ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
if N >= (WriteSeqId - ReadSeqId) -> {ok, State};
true ->
@@ -564,8 +556,8 @@ internal_requeue_next_n(Q, N, State = #dqstate { store = Store,
end
),
true = ets:insert(Sequences, {Q, ReadSeqIdN, WriteSeqIdN}),
- Store1 = rabbit_msg_store:release(MsgIds, Store),
- {ok, State #dqstate { store = Store1 }}
+ ok = rabbit_msg_store:release(MsgIds),
+ {ok, State}
end.
requeue_next_messages(_Q, 0, ReadSeq, WriteSeq, Acc) ->
@@ -705,16 +697,16 @@ prune_flush_batch(DeleteAcc) ->
mnesia:dirty_delete(rabbit_disk_queue, Key)
end, ok, DeleteAcc).
-prune(Store) ->
- prune(Store, mnesia:dirty_first(rabbit_disk_queue), [], 0).
+prune() ->
+ prune(mnesia:dirty_first(rabbit_disk_queue), [], 0).
-prune(_Store, '$end_of_table', DeleteAcc, _Len) ->
+prune('$end_of_table', DeleteAcc, _Len) ->
prune_flush_batch(DeleteAcc);
-prune(Store, Key, DeleteAcc, Len) ->
+prune(Key, DeleteAcc, Len) ->
[#dq_msg_loc { msg_id = MsgId, queue_and_seq_id = {Q, SeqId} }] =
mnesia:dirty_read(rabbit_disk_queue, Key),
{DeleteAcc1, Len1} =
- case rabbit_msg_store:contains(MsgId, Store) of
+ case rabbit_msg_store:contains(MsgId) of
true -> {DeleteAcc, Len};
false -> {[{Q, SeqId} | DeleteAcc], Len + 1}
end,
@@ -726,10 +718,10 @@ prune(Store, Key, DeleteAcc, Len) ->
%% start up in constant memory
ok = prune_flush_batch(DeleteAcc1),
NextKey = mnesia:dirty_first(rabbit_disk_queue),
- prune(Store, NextKey, [], 0);
+ prune(NextKey, [], 0);
true ->
NextKey = mnesia:dirty_next(rabbit_disk_queue, Key),
- prune(Store, NextKey, DeleteAcc1, Len1)
+ prune(NextKey, DeleteAcc1, Len1)
end.
extract_sequence_numbers(Sequences) ->
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 06c61f35..ef973d8a 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -31,14 +31,44 @@
-module(rabbit_msg_store).
--export([init/3, write/3, read/2, contains/2, remove/2, release/2,
- needs_sync/2, sync/1, cleanup/1]).
+-behaviour(gen_server2).
-%%----------------------------------------------------------------------------
+-export([start_link/3, write/2, read/1, contains/1, remove/1, release/1,
+ needs_sync/1, sync/0, stop/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(SERVER, ?MODULE).
-define(MAX_READ_FILE_HANDLES, 256).
-define(FILE_SIZE_LIMIT, (256*1024*1024)).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(msg_id() :: binary()).
+-type(msg() :: any()).
+-type(file_path() :: any()).
+
+-spec(start_link/3 ::
+ (file_path(),
+ (fun ((A) -> 'finished' | {msg_id(), non_neg_integer(), A})), A) ->
+ {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(write/2 :: (msg_id(), msg()) -> 'ok').
+-spec(read/1 :: (msg_id()) -> {'ok', msg()} | 'not_found').
+-spec(contains/1 :: (msg_id()) -> boolean()).
+-spec(remove/1 :: ([msg_id()]) -> 'ok').
+-spec(release/1 :: ([msg_id()]) -> 'ok').
+-spec(needs_sync/1 :: ([msg_id()]) -> boolean()).
+-spec(sync/0 :: () -> 'ok').
+-spec(stop/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
-record(msstate,
{dir, %% store directory
msg_locations, %% where are messages?
@@ -64,53 +94,12 @@
-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
-define(FILE_EXTENSION, ".rdq").
-define(FILE_EXTENSION_TMP, ".rdt").
--define(FILE_EXTENSION_DETS, ".dets").
-define(CACHE_ETS_NAME, rabbit_disk_queue_cache).
-define(BINARY_MODE, [raw, binary]).
-define(READ_MODE, [read, read_ahead]).
-define(WRITE_MODE, [write, delayed_write]).
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--type(ets_table() :: any()).
--type(msg_id() :: binary()).
--type(msg() :: any()).
--type(file_path() :: any()).
--type(io_device() :: any()).
-
--type(msstate() :: #msstate {
- dir :: file_path(),
- msg_locations :: ets_table(),
- file_summary :: ets_table(),
- current_file :: non_neg_integer(),
- current_file_handle :: io_device(),
- current_offset :: non_neg_integer(),
- current_dirty :: boolean(),
- file_size_limit :: non_neg_integer(),
- read_file_handle_cache :: any(),
- last_sync_offset :: non_neg_integer(),
- message_cache :: ets_table()
- }).
-
--spec(init/3 :: (file_path(),
- (fun ((A) -> 'finished' | {msg_id(), non_neg_integer(), A})),
- A) -> msstate()).
--spec(write/3 :: (msg_id(), msg(), msstate()) -> msstate()).
--spec(read/2 :: (msg_id(), msstate()) -> {msg(), msstate()} | 'not_found').
--spec(contains/2 :: (msg_id(), msstate()) -> boolean()).
--spec(remove/2 :: ([msg_id()], msstate()) -> msstate()).
--spec(release/2 :: ([msg_id()], msstate()) -> msstate()).
--spec(needs_sync/2 :: ([msg_id()], msstate()) -> boolean()).
--spec(sync/1 :: (msstate()) -> msstate()).
--spec(cleanup/1 :: (msstate()) -> msstate()).
-
--endif.
-
-%%----------------------------------------------------------------------------
-
%% The components:
%%
%% MsgLocation: this is an ets table which contains:
@@ -233,7 +222,25 @@
%% public API
%%----------------------------------------------------------------------------
-init(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) ->
+start_link(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE,
+ [Dir, MsgRefDeltaGen, MsgRefDeltaGenInit],
+ [{timeout, infinity}]).
+
+write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}).
+read(MsgId) -> gen_server2:call(?SERVER, {read, MsgId}, infinity).
+contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity).
+remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}).
+release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}).
+needs_sync(MsgIds) -> gen_server2:call(?SERVER, {needs_sync, MsgIds}, infinity).
+sync() -> gen_server2:call(?SERVER, sync, infinity).
+stop() -> gen_server2:call(?SERVER, stop, infinity).
+
+%%----------------------------------------------------------------------------
+%% gen_server callbacks
+%%----------------------------------------------------------------------------
+
+init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
MsgLocations = ets:new(?MSG_LOC_NAME,
[set, private, {keypos, #msg_location.msg_id}]),
@@ -275,47 +282,11 @@ init(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) ->
?WRITE_MODE ++ [read]),
{ok, Offset} = file:position(FileHdl, Offset),
- State1 #msstate { current_file_handle = FileHdl }.
-
-write(MsgId, Msg, State = #msstate { current_file_handle = CurHdl,
- current_file = CurFile,
- current_offset = CurOffset,
- file_summary = FileSummary }) ->
- case index_lookup(MsgId, State) of
- not_found ->
- %% New message, lots to do
- {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg),
- ok = index_insert(#msg_location {
- msg_id = MsgId, ref_count = 1, file = CurFile,
- offset = CurOffset, total_size = TotalSize },
- State),
- [FSEntry = #file_summary { valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop,
- right = undefined }] =
- ets:lookup(FileSummary, CurFile),
- ValidTotalSize1 = ValidTotalSize + TotalSize,
- ContiguousTop1 = if CurOffset =:= ContiguousTop ->
- %% can't be any holes in this file
- ValidTotalSize1;
- true -> ContiguousTop
- end,
- true = ets:insert(FileSummary, FSEntry #file_summary {
- valid_total_size = ValidTotalSize1,
- contiguous_top = ContiguousTop1 }),
- NextOffset = CurOffset + TotalSize,
- maybe_roll_to_new_file(
- NextOffset, State #msstate {current_offset = NextOffset,
- current_dirty = true});
- StoreEntry = #msg_location { ref_count = RefCount } ->
- %% We already know about it, just update counter
- ok = index_update(StoreEntry #msg_location {
- ref_count = RefCount + 1 }, State),
- State
- end.
+ {ok, State1 #msstate { current_file_handle = FileHdl }}.
-read(MsgId, State) ->
+handle_call({read, MsgId}, _From, State) ->
case index_lookup(MsgId, State) of
- not_found -> not_found;
+ not_found -> reply(not_found, State);
#msg_location { ref_count = RefCount,
file = File,
offset = Offset,
@@ -347,57 +318,100 @@ read(MsgId, State) ->
%% message. So don't bother
%% putting it in the cache.
end,
- {Msg, State1};
+ reply({ok, Msg}, State1);
{Msg, _RefCount} ->
- {Msg, State}
+ reply({ok, Msg}, State)
end
- end.
-
-contains(MsgId, State) ->
+ end;
+
+handle_call({contains, MsgId}, _From, State) ->
+ reply(case index_lookup(MsgId, State) of
+ not_found -> false;
+ #msg_location {} -> true
+ end, State);
+
+handle_call({needs_sync, _MsgIds}, _From,
+ State = #msstate { current_dirty = false }) ->
+ reply(false, State);
+handle_call({needs_sync, MsgIds}, _From,
+ State = #msstate { current_file = CurFile,
+ last_sync_offset = SyncOffset }) ->
+ reply(lists:any(fun (MsgId) ->
+ #msg_location { file = File, offset = Offset } =
+ index_lookup(MsgId, State),
+ File =:= CurFile andalso Offset >= SyncOffset
+ end, MsgIds), State);
+
+handle_call(sync, _From, State) ->
+ reply(ok, sync(State));
+
+handle_call(stop, _From, State) ->
+ {stop, normal, ok, State}.
+
+handle_cast({write, MsgId, Msg},
+ State = #msstate { current_file_handle = CurHdl,
+ current_file = CurFile,
+ current_offset = CurOffset,
+ file_summary = FileSummary }) ->
case index_lookup(MsgId, State) of
- not_found -> false;
- #msg_location {} -> true
- end.
-
-remove(MsgIds, State = #msstate { current_file = CurFile }) ->
- compact(sets:to_list(
- lists:foldl(
- fun (MsgId, Files1) ->
- case remove_message(MsgId, State) of
- {compact, File} ->
- if CurFile =:= File -> Files1;
- true -> sets:add_element(File, Files1)
- end;
- no_compact -> Files1
+ not_found ->
+ %% New message, lots to do
+ {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg),
+ ok = index_insert(#msg_location {
+ msg_id = MsgId, ref_count = 1, file = CurFile,
+ offset = CurOffset, total_size = TotalSize },
+ State),
+ [FSEntry = #file_summary { valid_total_size = ValidTotalSize,
+ contiguous_top = ContiguousTop,
+ right = undefined }] =
+ ets:lookup(FileSummary, CurFile),
+ ValidTotalSize1 = ValidTotalSize + TotalSize,
+ ContiguousTop1 = if CurOffset =:= ContiguousTop ->
+ %% can't be any holes in this file
+ ValidTotalSize1;
+ true -> ContiguousTop
+ end,
+ true = ets:insert(FileSummary, FSEntry #file_summary {
+ valid_total_size = ValidTotalSize1,
+ contiguous_top = ContiguousTop1 }),
+ NextOffset = CurOffset + TotalSize,
+ noreply(
+ maybe_roll_to_new_file(
+ NextOffset, State #msstate {current_offset = NextOffset,
+ current_dirty = true}));
+ StoreEntry = #msg_location { ref_count = RefCount } ->
+ %% We already know about it, just update counter
+ ok = index_update(StoreEntry #msg_location {
+ ref_count = RefCount + 1 }, State),
+ noreply(State)
+ end;
+
+handle_cast({remove, MsgIds}, State = #msstate { current_file = CurFile }) ->
+ noreply(
+ compact(sets:to_list(
+ lists:foldl(
+ fun (MsgId, Files1) ->
+ case remove_message(MsgId, State) of
+ {compact, File} ->
+ if CurFile =:= File -> Files1;
+ true -> sets:add_element(File, Files1)
+ end;
+ no_compact -> Files1
end
- end, sets:new(), MsgIds)),
- State).
+ end, sets:new(), MsgIds)),
+ State));
-release(MsgIds, State) ->
+handle_cast({release, MsgIds}, State) ->
lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds),
- State.
+ noreply(State).
-needs_sync(_MsgIds, #msstate { current_dirty = false }) ->
- false;
-needs_sync(MsgIds, State = #msstate { current_file = CurFile,
- last_sync_offset = SyncOffset }) ->
- lists:any(fun (MsgId) ->
- #msg_location { file = File, offset = Offset } =
- index_lookup(MsgId, State),
- File =:= CurFile andalso Offset >= SyncOffset
- end, MsgIds).
+handle_info(_Info, State) ->
+ noreply(State).
-sync(State = #msstate { current_dirty = false }) ->
- State;
-sync(State = #msstate { current_file_handle = CurHdl,
- current_offset = CurOffset }) ->
- ok = file:sync(CurHdl),
- State #msstate { current_dirty = false, last_sync_offset = CurOffset }.
-
-cleanup(State = #msstate { msg_locations = MsgLocations,
- file_summary = FileSummary,
- current_file_handle = FileHdl,
- read_file_handle_cache = HC }) ->
+terminate(_Reason, State = #msstate { msg_locations = MsgLocations,
+ file_summary = FileSummary,
+ current_file_handle = FileHdl,
+ read_file_handle_cache = HC }) ->
State1 = case FileHdl of
undefined -> State;
_ -> State2 = sync(State),
@@ -411,13 +425,19 @@ cleanup(State = #msstate { msg_locations = MsgLocations,
file_summary = undefined,
current_file_handle = undefined,
current_dirty = false,
- read_file_handle_cache = HC1
- }.
+ read_file_handle_cache = HC1 }.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
%%----------------------------------------------------------------------------
%% general helper functions
%%----------------------------------------------------------------------------
+noreply(State) -> {noreply, State}.
+
+reply(Reply, State) -> {reply, Reply, State}.
+
form_filename(Dir, Name) -> filename:join(Dir, Name).
filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
@@ -442,6 +462,13 @@ truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) ->
ok = file:truncate(FileHdl),
ok = preallocate(FileHdl, Highpoint, Lowpoint).
+sync(State = #msstate { current_dirty = false }) ->
+ State;
+sync(State = #msstate { current_file_handle = CurHdl,
+ current_offset = CurOffset }) ->
+ ok = file:sync(CurHdl),
+ State #msstate { current_dirty = false, last_sync_offset = CurOffset }.
+
with_read_handle_at(File, Offset, Fun,
State = #msstate { dir = Dir,
read_file_handle_cache = HC,