diff options
-rw-r--r-- | src/rabbit_disk_queue.erl | 42 |
1 files changed, 26 insertions, 16 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index ac58d89d..2a7505a7 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -86,7 +86,8 @@ read_file_handles, %% file handles for reading (LRU) read_file_handles_limit, %% how many file handles can we open? on_sync_froms, %% list of commiters to run on sync (reversed) - timer_ref %% TRef for our interval timer + timer_ref, %% TRef for our interval timer + last_sync_offset %% current_offset at the last time we sync'd }). %% The components: @@ -394,7 +395,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> read_file_handles = {dict:new(), gb_trees:empty()}, read_file_handles_limit = ReadFileHandlesLimit, on_sync_froms = [], - timer_ref = undefined + timer_ref = undefined, + last_sync_offset = 0 }, {ok, State1 = #dqstate { current_file_name = CurrentName, current_offset = Offset } } = @@ -648,13 +650,14 @@ determine_next_read_id(CurrentRead, CurrentWrite, NextWrite) when NextWrite >= CurrentWrite -> CurrentRead. -get_read_handle(File, State = +get_read_handle(File, Offset, State = #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge}, read_file_handles_limit = ReadFileHandlesLimit, current_file_name = CurName, - current_dirty = IsDirty + current_dirty = IsDirty, + last_sync_offset = SyncOffset }) -> - State1 = if CurName =:= File andalso IsDirty -> + State1 = if CurName =:= File andalso IsDirty andalso Offset >= SyncOffset -> sync_current_file_handle(State); true -> State end, @@ -727,15 +730,19 @@ sync_current_file_handle(State = #dqstate { current_dirty = false, State; sync_current_file_handle(State = #dqstate { current_file_handle = CurHdl, current_dirty = IsDirty, - on_sync_froms = Froms + current_offset = CurOffset, + on_sync_froms = Froms, + last_sync_offset = SyncOffset }) -> - ok = case IsDirty of - true -> file:sync(CurHdl); - false -> ok - end, + SyncOffset1 = case IsDirty of + true -> ok = file:sync(CurHdl), + CurOffset; + false -> SyncOffset + end, lists:map(fun (From) -> gen_server2:reply(From, ok) end, lists:reverse(Froms)), - State #dqstate { current_dirty = false, on_sync_froms = [] }. + State #dqstate { current_dirty = false, on_sync_froms = [], + last_sync_offset = SyncOffset1 }. %% ---- INTERNAL RAW FUNCTIONS ---- @@ -776,7 +783,7 @@ internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) -> end, case ReadMsg of true -> - {FileHdl, State1} = get_read_handle(File, State), + {FileHdl, State1} = get_read_handle(File, Offset, State), {ok, {MsgBody, BodySize}} = read_message_at_offset(FileHdl, Offset, TotalSize), {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}}, @@ -883,7 +890,8 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From, State = #dqstate { sequences = Sequences, current_file_name = CurFile, current_dirty = IsDirty, - on_sync_froms = SyncFroms + on_sync_froms = SyncFroms, + last_sync_offset = SyncOffset }) -> {PubList, PubAcc, ReadSeqId, Length} = case PubMsgSeqIds of @@ -909,7 +917,7 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From, lists:foldl( fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}}, {InCurFileAcc, ExpectedSeqId}) -> - [{MsgId, _RefCount, File, _Offset, + [{MsgId, _RefCount, File, Offset, _TotalSize}] = dets_ets_lookup(State, MsgId), SeqId1 = adjust_last_msg_seq_id( Q, ExpectedSeqId, SeqId, write), @@ -924,7 +932,8 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From, next_seq_id = NextSeqId1 }, write), - {InCurFileAcc orelse File =:= CurFile, + {InCurFileAcc orelse (File =:= CurFile andalso + Offset >= SyncOffset), NextSeqId1} end, {false, PubAcc}, PubList), {ok, State2} = remove_messages(Q, AckSeqIds, txn, State), @@ -1126,7 +1135,8 @@ maybe_roll_to_new_file(Offset, State2 = State1 #dqstate { current_file_name = NextName, current_file_handle = NextHdl, current_file_num = NextNum, - current_offset = 0 + current_offset = 0, + last_sync_offset = 0 }, {ok, compact(sets:from_list([CurName]), State2)}; maybe_roll_to_new_file(_, State) -> |