summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_disk_queue.erl42
1 files changed, 26 insertions, 16 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index ac58d89d..2a7505a7 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -86,7 +86,8 @@
read_file_handles, %% file handles for reading (LRU)
read_file_handles_limit, %% how many file handles can we open?
on_sync_froms, %% list of commiters to run on sync (reversed)
- timer_ref %% TRef for our interval timer
+ timer_ref, %% TRef for our interval timer
+ last_sync_offset %% current_offset at the last time we sync'd
}).
%% The components:
@@ -394,7 +395,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
read_file_handles = {dict:new(), gb_trees:empty()},
read_file_handles_limit = ReadFileHandlesLimit,
on_sync_froms = [],
- timer_ref = undefined
+ timer_ref = undefined,
+ last_sync_offset = 0
},
{ok, State1 = #dqstate { current_file_name = CurrentName,
current_offset = Offset } } =
@@ -648,13 +650,14 @@ determine_next_read_id(CurrentRead, CurrentWrite, NextWrite)
when NextWrite >= CurrentWrite ->
CurrentRead.
-get_read_handle(File, State =
+get_read_handle(File, Offset, State =
#dqstate { read_file_handles = {ReadHdls, ReadHdlsAge},
read_file_handles_limit = ReadFileHandlesLimit,
current_file_name = CurName,
- current_dirty = IsDirty
+ current_dirty = IsDirty,
+ last_sync_offset = SyncOffset
}) ->
- State1 = if CurName =:= File andalso IsDirty ->
+ State1 = if CurName =:= File andalso IsDirty andalso Offset >= SyncOffset ->
sync_current_file_handle(State);
true -> State
end,
@@ -727,15 +730,19 @@ sync_current_file_handle(State = #dqstate { current_dirty = false,
State;
sync_current_file_handle(State = #dqstate { current_file_handle = CurHdl,
current_dirty = IsDirty,
- on_sync_froms = Froms
+ current_offset = CurOffset,
+ on_sync_froms = Froms,
+ last_sync_offset = SyncOffset
}) ->
- ok = case IsDirty of
- true -> file:sync(CurHdl);
- false -> ok
- end,
+ SyncOffset1 = case IsDirty of
+ true -> ok = file:sync(CurHdl),
+ CurOffset;
+ false -> SyncOffset
+ end,
lists:map(fun (From) -> gen_server2:reply(From, ok) end,
lists:reverse(Froms)),
- State #dqstate { current_dirty = false, on_sync_froms = [] }.
+ State #dqstate { current_dirty = false, on_sync_froms = [],
+ last_sync_offset = SyncOffset1 }.
%% ---- INTERNAL RAW FUNCTIONS ----
@@ -776,7 +783,7 @@ internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) ->
end,
case ReadMsg of
true ->
- {FileHdl, State1} = get_read_handle(File, State),
+ {FileHdl, State1} = get_read_handle(File, Offset, State),
{ok, {MsgBody, BodySize}} =
read_message_at_offset(FileHdl, Offset, TotalSize),
{ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}},
@@ -883,7 +890,8 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From,
State = #dqstate { sequences = Sequences,
current_file_name = CurFile,
current_dirty = IsDirty,
- on_sync_froms = SyncFroms
+ on_sync_froms = SyncFroms,
+ last_sync_offset = SyncOffset
}) ->
{PubList, PubAcc, ReadSeqId, Length} =
case PubMsgSeqIds of
@@ -909,7 +917,7 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From,
lists:foldl(
fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}},
{InCurFileAcc, ExpectedSeqId}) ->
- [{MsgId, _RefCount, File, _Offset,
+ [{MsgId, _RefCount, File, Offset,
_TotalSize}] = dets_ets_lookup(State, MsgId),
SeqId1 = adjust_last_msg_seq_id(
Q, ExpectedSeqId, SeqId, write),
@@ -924,7 +932,8 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From,
next_seq_id = NextSeqId1
},
write),
- {InCurFileAcc orelse File =:= CurFile,
+ {InCurFileAcc orelse (File =:= CurFile andalso
+ Offset >= SyncOffset),
NextSeqId1}
end, {false, PubAcc}, PubList),
{ok, State2} = remove_messages(Q, AckSeqIds, txn, State),
@@ -1126,7 +1135,8 @@ maybe_roll_to_new_file(Offset,
State2 = State1 #dqstate { current_file_name = NextName,
current_file_handle = NextHdl,
current_file_num = NextNum,
- current_offset = 0
+ current_offset = 0,
+ last_sync_offset = 0
},
{ok, compact(sets:from_list([CurName]), State2)};
maybe_roll_to_new_file(_, State) ->