diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-09-25 14:49:54 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-09-25 14:49:54 +0100 |
commit | fa65a1266274ad41b1addf15c83112ee33d6f69d (patch) | |
tree | 52a8b978d1b6056d350bc11bee654dccef635b23 | |
parent | 577a216cc49da650fd0f42a64a684a7e66bede30 (diff) | |
download | rabbitmq-server-fa65a1266274ad41b1addf15c83112ee33d6f69d.tar.gz |
turn the msg_store into a separate process
-rw-r--r-- | src/rabbit_disk_queue.erl | 96 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 285 |
2 files changed, 200 insertions, 181 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 6fe2a4b3..02a8ed8c 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -70,8 +70,7 @@ -define(SERVER, ?MODULE). -record(dqstate, - {store, %% message store - sequences, %% next read and write for each q + {sequences, %% next read and write for each q on_sync_txns, %% list of commiters to run on sync (reversed) commit_timer_ref %% TRef for our interval timer }). @@ -196,15 +195,15 @@ init([]) -> ok = detect_shutdown_state_and_adjust_delivered_flags(), - Store = rabbit_msg_store:init(base_directory(), - fun msg_ref_gen/1, msg_ref_gen_init()), - ok = prune(Store), + {ok, _Pid} = rabbit_msg_store:start_link(base_directory(), + fun msg_ref_gen/1, + msg_ref_gen_init()), + ok = prune(), Sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]), ok = extract_sequence_numbers(Sequences), - State = #dqstate { store = Store, - sequences = Sequences, + State = #dqstate { sequences = Sequences, on_sync_txns = [], commit_timer_ref = undefined }, {ok, State, hibernate, @@ -291,11 +290,11 @@ terminate(_Reason, State) -> shutdown(State = #dqstate { sequences = undefined }) -> State; -shutdown(State = #dqstate { sequences = Sequences, store = Store }) -> +shutdown(State = #dqstate { sequences = Sequences }) -> State1 = stop_commit_timer(State), - Store1 = rabbit_msg_store:cleanup(Store), + ok = rabbit_msg_store:stop(), ets:delete(Sequences), - State1 #dqstate { sequences = undefined, store = Store1 }. + State1 #dqstate { sequences = undefined }. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -350,12 +349,12 @@ stop_commit_timer(State = #dqstate { commit_timer_ref = TRef }) -> {ok, cancel} = timer:cancel(TRef), State #dqstate { commit_timer_ref = undefined }. -sync(State = #dqstate { store = Store, on_sync_txns = Txns }) -> - State1 = State #dqstate { store = rabbit_msg_store:sync(Store) }, +sync(State = #dqstate { on_sync_txns = Txns }) -> + ok = rabbit_msg_store:sync(), case Txns of - [] -> State1; + [] -> State; _ -> lists:foldl(fun internal_do_tx_commit/2, - State1 #dqstate { on_sync_txns = [] }, + State #dqstate { on_sync_txns = [] }, lists:reverse(Txns)) end. @@ -363,13 +362,12 @@ sync(State = #dqstate { store = Store, on_sync_txns = Txns }) -> %% internal functions %%---------------------------------------------------------------------------- -internal_fetch_body(Q, Advance, State = #dqstate { store = Store }) -> +internal_fetch_body(Q, Advance, State) -> case next(Q, record_delivery, Advance, State) of empty -> {empty, State}; {MsgId, IsDelivered, AckTag, Remaining} -> - {Message, Store1} = rabbit_msg_store:read(MsgId, Store), - State1 = State #dqstate { store = Store1 }, - {{Message, IsDelivered, AckTag, Remaining}, State1} + {ok, Message} = rabbit_msg_store:read(MsgId), + {{Message, IsDelivered, AckTag, Remaining}, State} end. internal_fetch_attributes(Q, MarkDelivered, State) -> @@ -413,41 +411,37 @@ internal_foldl(Q, Fun, Init, State) -> internal_foldl(_Q, SeqId, _Fun, State, Acc, SeqId) -> {ok, Acc, State}; -internal_foldl(Q, WriteSeqId, Fun, State = #dqstate { store = Store }, - Acc, ReadSeqId) -> +internal_foldl(Q, WriteSeqId, Fun, State, Acc, ReadSeqId) -> [#dq_msg_loc {is_delivered = IsDelivered, msg_id = MsgId}] = mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}), - {Message, Store1} = rabbit_msg_store:read(MsgId, Store), + {ok, Message} = rabbit_msg_store:read(MsgId), Acc1 = Fun(Message, {MsgId, ReadSeqId}, IsDelivered, Acc), - internal_foldl(Q, WriteSeqId, Fun, State #dqstate { store = Store1 }, - Acc1, ReadSeqId + 1). + internal_foldl(Q, WriteSeqId, Fun, State, Acc1, ReadSeqId + 1). internal_ack(Q, MsgSeqIds, State) -> remove_messages(Q, MsgSeqIds, State). -remove_messages(Q, MsgSeqIds, State = #dqstate { store = Store } ) -> +remove_messages(Q, MsgSeqIds, State) -> MsgIds = lists:foldl( fun ({MsgId, SeqId}, MsgIdAcc) -> ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}), [MsgId | MsgIdAcc] end, [], MsgSeqIds), - Store1 = rabbit_msg_store:remove(MsgIds, Store), - {ok, State #dqstate { store = Store1 }}. + ok = rabbit_msg_store:remove(MsgIds), + {ok, State}. internal_tx_publish(Message = #basic_message { guid = MsgId, - content = Content }, - State = #dqstate { store = Store }) -> + content = Content }, State) -> ClearedContent = rabbit_binary_parser:clear_decoded_content(Content), - Message1 = Message #basic_message { content = ClearedContent }, - Store1 = rabbit_msg_store:write(MsgId, Message1, Store), - {ok, State #dqstate { store = Store1 }}. + ok = rabbit_msg_store:write( + MsgId, Message #basic_message { content = ClearedContent }), + {ok, State}. internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, - State = #dqstate { store = Store, on_sync_txns = Txns }) -> + State = #dqstate { on_sync_txns = Txns }) -> TxnDetails = {Q, PubMsgIds, AckSeqIds, From}, case rabbit_msg_store:needs_sync( - [MsgId || {MsgId, _IsDelivered, _IsPersistent} <- PubMsgIds], - Store) of + [MsgId || {MsgId, _IsDelivered, _IsPersistent} <- PubMsgIds]) of true -> Txns1 = [TxnDetails | Txns], State #dqstate { on_sync_txns = Txns1 }; false -> internal_do_tx_commit(TxnDetails, State) @@ -496,14 +490,13 @@ internal_publish(Q, Message = #basic_message { guid = MsgId, true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId + 1}), {ok, {MsgId, WriteSeqId}, State1}. -internal_tx_rollback(MsgIds, State = #dqstate { store = Store }) -> - Store1 = rabbit_msg_store:remove(MsgIds, Store), - {ok, State #dqstate { store = Store1 }}. +internal_tx_rollback(MsgIds, State) -> + ok = rabbit_msg_store:remove(MsgIds), + {ok, State}. internal_requeue(_Q, [], State) -> {ok, State}; -internal_requeue(Q, MsgSeqIds, State = #dqstate { store = Store, - sequences = Sequences }) -> +internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) -> %% We know that every seq_id in here is less than the ReadSeqId %% you'll get if you look up this queue in Sequences (i.e. they've %% already been delivered). We also know that the rows for these @@ -536,8 +529,8 @@ internal_requeue(Q, MsgSeqIds, State = #dqstate { store = Store, MsgSeqIds) end), true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId1}), - Store1 = rabbit_msg_store:release(MsgIds, Store), - {ok, State #dqstate { store = Store1 }}. + ok = rabbit_msg_store:release(MsgIds), + {ok, State}. requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, Acc}) -> [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] = @@ -551,8 +544,7 @@ requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, Acc}) -> {WriteSeqId + 1, Q, [MsgId | Acc]}. %% move the next N messages from the front of the queue to the back. -internal_requeue_next_n(Q, N, State = #dqstate { store = Store, - sequences = Sequences }) -> +internal_requeue_next_n(Q, N, State = #dqstate { sequences = Sequences }) -> {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), if N >= (WriteSeqId - ReadSeqId) -> {ok, State}; true -> @@ -564,8 +556,8 @@ internal_requeue_next_n(Q, N, State = #dqstate { store = Store, end ), true = ets:insert(Sequences, {Q, ReadSeqIdN, WriteSeqIdN}), - Store1 = rabbit_msg_store:release(MsgIds, Store), - {ok, State #dqstate { store = Store1 }} + ok = rabbit_msg_store:release(MsgIds), + {ok, State} end. requeue_next_messages(_Q, 0, ReadSeq, WriteSeq, Acc) -> @@ -705,16 +697,16 @@ prune_flush_batch(DeleteAcc) -> mnesia:dirty_delete(rabbit_disk_queue, Key) end, ok, DeleteAcc). -prune(Store) -> - prune(Store, mnesia:dirty_first(rabbit_disk_queue), [], 0). +prune() -> + prune(mnesia:dirty_first(rabbit_disk_queue), [], 0). -prune(_Store, '$end_of_table', DeleteAcc, _Len) -> +prune('$end_of_table', DeleteAcc, _Len) -> prune_flush_batch(DeleteAcc); -prune(Store, Key, DeleteAcc, Len) -> +prune(Key, DeleteAcc, Len) -> [#dq_msg_loc { msg_id = MsgId, queue_and_seq_id = {Q, SeqId} }] = mnesia:dirty_read(rabbit_disk_queue, Key), {DeleteAcc1, Len1} = - case rabbit_msg_store:contains(MsgId, Store) of + case rabbit_msg_store:contains(MsgId) of true -> {DeleteAcc, Len}; false -> {[{Q, SeqId} | DeleteAcc], Len + 1} end, @@ -726,10 +718,10 @@ prune(Store, Key, DeleteAcc, Len) -> %% start up in constant memory ok = prune_flush_batch(DeleteAcc1), NextKey = mnesia:dirty_first(rabbit_disk_queue), - prune(Store, NextKey, [], 0); + prune(NextKey, [], 0); true -> NextKey = mnesia:dirty_next(rabbit_disk_queue, Key), - prune(Store, NextKey, DeleteAcc1, Len1) + prune(NextKey, DeleteAcc1, Len1) end. extract_sequence_numbers(Sequences) -> diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 06c61f35..ef973d8a 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -31,14 +31,44 @@ -module(rabbit_msg_store). --export([init/3, write/3, read/2, contains/2, remove/2, release/2, - needs_sync/2, sync/1, cleanup/1]). +-behaviour(gen_server2). -%%---------------------------------------------------------------------------- +-export([start_link/3, write/2, read/1, contains/1, remove/1, release/1, + needs_sync/1, sync/0, stop/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). -define(MAX_READ_FILE_HANDLES, 256). -define(FILE_SIZE_LIMIT, (256*1024*1024)). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(msg_id() :: binary()). +-type(msg() :: any()). +-type(file_path() :: any()). + +-spec(start_link/3 :: + (file_path(), + (fun ((A) -> 'finished' | {msg_id(), non_neg_integer(), A})), A) -> + {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(write/2 :: (msg_id(), msg()) -> 'ok'). +-spec(read/1 :: (msg_id()) -> {'ok', msg()} | 'not_found'). +-spec(contains/1 :: (msg_id()) -> boolean()). +-spec(remove/1 :: ([msg_id()]) -> 'ok'). +-spec(release/1 :: ([msg_id()]) -> 'ok'). +-spec(needs_sync/1 :: ([msg_id()]) -> boolean()). +-spec(sync/0 :: () -> 'ok'). +-spec(stop/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + -record(msstate, {dir, %% store directory msg_locations, %% where are messages? @@ -64,53 +94,12 @@ -define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary). -define(FILE_EXTENSION, ".rdq"). -define(FILE_EXTENSION_TMP, ".rdt"). --define(FILE_EXTENSION_DETS, ".dets"). -define(CACHE_ETS_NAME, rabbit_disk_queue_cache). -define(BINARY_MODE, [raw, binary]). -define(READ_MODE, [read, read_ahead]). -define(WRITE_MODE, [write, delayed_write]). -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(ets_table() :: any()). --type(msg_id() :: binary()). --type(msg() :: any()). --type(file_path() :: any()). --type(io_device() :: any()). - --type(msstate() :: #msstate { - dir :: file_path(), - msg_locations :: ets_table(), - file_summary :: ets_table(), - current_file :: non_neg_integer(), - current_file_handle :: io_device(), - current_offset :: non_neg_integer(), - current_dirty :: boolean(), - file_size_limit :: non_neg_integer(), - read_file_handle_cache :: any(), - last_sync_offset :: non_neg_integer(), - message_cache :: ets_table() - }). - --spec(init/3 :: (file_path(), - (fun ((A) -> 'finished' | {msg_id(), non_neg_integer(), A})), - A) -> msstate()). --spec(write/3 :: (msg_id(), msg(), msstate()) -> msstate()). --spec(read/2 :: (msg_id(), msstate()) -> {msg(), msstate()} | 'not_found'). --spec(contains/2 :: (msg_id(), msstate()) -> boolean()). --spec(remove/2 :: ([msg_id()], msstate()) -> msstate()). --spec(release/2 :: ([msg_id()], msstate()) -> msstate()). --spec(needs_sync/2 :: ([msg_id()], msstate()) -> boolean()). --spec(sync/1 :: (msstate()) -> msstate()). --spec(cleanup/1 :: (msstate()) -> msstate()). - --endif. - -%%---------------------------------------------------------------------------- - %% The components: %% %% MsgLocation: this is an ets table which contains: @@ -233,7 +222,25 @@ %% public API %%---------------------------------------------------------------------------- -init(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) -> +start_link(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, + [Dir, MsgRefDeltaGen, MsgRefDeltaGenInit], + [{timeout, infinity}]). + +write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}). +read(MsgId) -> gen_server2:call(?SERVER, {read, MsgId}, infinity). +contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity). +remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}). +release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}). +needs_sync(MsgIds) -> gen_server2:call(?SERVER, {needs_sync, MsgIds}, infinity). +sync() -> gen_server2:call(?SERVER, sync, infinity). +stop() -> gen_server2:call(?SERVER, stop, infinity). + +%%---------------------------------------------------------------------------- +%% gen_server callbacks +%%---------------------------------------------------------------------------- + +init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> MsgLocations = ets:new(?MSG_LOC_NAME, [set, private, {keypos, #msg_location.msg_id}]), @@ -275,47 +282,11 @@ init(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) -> ?WRITE_MODE ++ [read]), {ok, Offset} = file:position(FileHdl, Offset), - State1 #msstate { current_file_handle = FileHdl }. - -write(MsgId, Msg, State = #msstate { current_file_handle = CurHdl, - current_file = CurFile, - current_offset = CurOffset, - file_summary = FileSummary }) -> - case index_lookup(MsgId, State) of - not_found -> - %% New message, lots to do - {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg), - ok = index_insert(#msg_location { - msg_id = MsgId, ref_count = 1, file = CurFile, - offset = CurOffset, total_size = TotalSize }, - State), - [FSEntry = #file_summary { valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, - right = undefined }] = - ets:lookup(FileSummary, CurFile), - ValidTotalSize1 = ValidTotalSize + TotalSize, - ContiguousTop1 = if CurOffset =:= ContiguousTop -> - %% can't be any holes in this file - ValidTotalSize1; - true -> ContiguousTop - end, - true = ets:insert(FileSummary, FSEntry #file_summary { - valid_total_size = ValidTotalSize1, - contiguous_top = ContiguousTop1 }), - NextOffset = CurOffset + TotalSize, - maybe_roll_to_new_file( - NextOffset, State #msstate {current_offset = NextOffset, - current_dirty = true}); - StoreEntry = #msg_location { ref_count = RefCount } -> - %% We already know about it, just update counter - ok = index_update(StoreEntry #msg_location { - ref_count = RefCount + 1 }, State), - State - end. + {ok, State1 #msstate { current_file_handle = FileHdl }}. -read(MsgId, State) -> +handle_call({read, MsgId}, _From, State) -> case index_lookup(MsgId, State) of - not_found -> not_found; + not_found -> reply(not_found, State); #msg_location { ref_count = RefCount, file = File, offset = Offset, @@ -347,57 +318,100 @@ read(MsgId, State) -> %% message. So don't bother %% putting it in the cache. end, - {Msg, State1}; + reply({ok, Msg}, State1); {Msg, _RefCount} -> - {Msg, State} + reply({ok, Msg}, State) end - end. - -contains(MsgId, State) -> + end; + +handle_call({contains, MsgId}, _From, State) -> + reply(case index_lookup(MsgId, State) of + not_found -> false; + #msg_location {} -> true + end, State); + +handle_call({needs_sync, _MsgIds}, _From, + State = #msstate { current_dirty = false }) -> + reply(false, State); +handle_call({needs_sync, MsgIds}, _From, + State = #msstate { current_file = CurFile, + last_sync_offset = SyncOffset }) -> + reply(lists:any(fun (MsgId) -> + #msg_location { file = File, offset = Offset } = + index_lookup(MsgId, State), + File =:= CurFile andalso Offset >= SyncOffset + end, MsgIds), State); + +handle_call(sync, _From, State) -> + reply(ok, sync(State)); + +handle_call(stop, _From, State) -> + {stop, normal, ok, State}. + +handle_cast({write, MsgId, Msg}, + State = #msstate { current_file_handle = CurHdl, + current_file = CurFile, + current_offset = CurOffset, + file_summary = FileSummary }) -> case index_lookup(MsgId, State) of - not_found -> false; - #msg_location {} -> true - end. - -remove(MsgIds, State = #msstate { current_file = CurFile }) -> - compact(sets:to_list( - lists:foldl( - fun (MsgId, Files1) -> - case remove_message(MsgId, State) of - {compact, File} -> - if CurFile =:= File -> Files1; - true -> sets:add_element(File, Files1) - end; - no_compact -> Files1 + not_found -> + %% New message, lots to do + {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg), + ok = index_insert(#msg_location { + msg_id = MsgId, ref_count = 1, file = CurFile, + offset = CurOffset, total_size = TotalSize }, + State), + [FSEntry = #file_summary { valid_total_size = ValidTotalSize, + contiguous_top = ContiguousTop, + right = undefined }] = + ets:lookup(FileSummary, CurFile), + ValidTotalSize1 = ValidTotalSize + TotalSize, + ContiguousTop1 = if CurOffset =:= ContiguousTop -> + %% can't be any holes in this file + ValidTotalSize1; + true -> ContiguousTop + end, + true = ets:insert(FileSummary, FSEntry #file_summary { + valid_total_size = ValidTotalSize1, + contiguous_top = ContiguousTop1 }), + NextOffset = CurOffset + TotalSize, + noreply( + maybe_roll_to_new_file( + NextOffset, State #msstate {current_offset = NextOffset, + current_dirty = true})); + StoreEntry = #msg_location { ref_count = RefCount } -> + %% We already know about it, just update counter + ok = index_update(StoreEntry #msg_location { + ref_count = RefCount + 1 }, State), + noreply(State) + end; + +handle_cast({remove, MsgIds}, State = #msstate { current_file = CurFile }) -> + noreply( + compact(sets:to_list( + lists:foldl( + fun (MsgId, Files1) -> + case remove_message(MsgId, State) of + {compact, File} -> + if CurFile =:= File -> Files1; + true -> sets:add_element(File, Files1) + end; + no_compact -> Files1 end - end, sets:new(), MsgIds)), - State). + end, sets:new(), MsgIds)), + State)); -release(MsgIds, State) -> +handle_cast({release, MsgIds}, State) -> lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds), - State. + noreply(State). -needs_sync(_MsgIds, #msstate { current_dirty = false }) -> - false; -needs_sync(MsgIds, State = #msstate { current_file = CurFile, - last_sync_offset = SyncOffset }) -> - lists:any(fun (MsgId) -> - #msg_location { file = File, offset = Offset } = - index_lookup(MsgId, State), - File =:= CurFile andalso Offset >= SyncOffset - end, MsgIds). +handle_info(_Info, State) -> + noreply(State). -sync(State = #msstate { current_dirty = false }) -> - State; -sync(State = #msstate { current_file_handle = CurHdl, - current_offset = CurOffset }) -> - ok = file:sync(CurHdl), - State #msstate { current_dirty = false, last_sync_offset = CurOffset }. - -cleanup(State = #msstate { msg_locations = MsgLocations, - file_summary = FileSummary, - current_file_handle = FileHdl, - read_file_handle_cache = HC }) -> +terminate(_Reason, State = #msstate { msg_locations = MsgLocations, + file_summary = FileSummary, + current_file_handle = FileHdl, + read_file_handle_cache = HC }) -> State1 = case FileHdl of undefined -> State; _ -> State2 = sync(State), @@ -411,13 +425,19 @@ cleanup(State = #msstate { msg_locations = MsgLocations, file_summary = undefined, current_file_handle = undefined, current_dirty = false, - read_file_handle_cache = HC1 - }. + read_file_handle_cache = HC1 }. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. %%---------------------------------------------------------------------------- %% general helper functions %%---------------------------------------------------------------------------- +noreply(State) -> {noreply, State}. + +reply(Reply, State) -> {reply, Reply, State}. + form_filename(Dir, Name) -> filename:join(Dir, Name). filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION. @@ -442,6 +462,13 @@ truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> ok = file:truncate(FileHdl), ok = preallocate(FileHdl, Highpoint, Lowpoint). +sync(State = #msstate { current_dirty = false }) -> + State; +sync(State = #msstate { current_file_handle = CurHdl, + current_offset = CurOffset }) -> + ok = file:sync(CurHdl), + State #msstate { current_dirty = false, last_sync_offset = CurOffset }. + with_read_handle_at(File, Offset, Fun, State = #msstate { dir = Dir, read_file_handle_cache = HC, |