summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-24 20:33:25 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-24 20:33:25 +0000
commited49d7eda7f92189eee162134198189ca80d0e6f (patch)
tree0d082f26e914f9cef46af5efbb9bc4f4c1ded6ab
parentfe0851e4573f2abe13c847e2f3c3f36c3f307d39 (diff)
parentcb4199ea10659ef67e5b35a97d1a0e4ece5c5dbb (diff)
downloadrabbitmq-server-ed49d7eda7f92189eee162134198189ca80d0e6f.tar.gz
merge default into bug25853
containing the recently merged bug25827
-rw-r--r--src/rabbit_queue_index.erl53
1 files changed, 34 insertions, 19 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index b5a316f0..8f2abb55 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -424,22 +424,24 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) ->
%% and the journal.
State1 = #qistate { dir = Dir, segments = Segments } =
recover_journal(State),
- {Segments1, Count} =
+ {Segments1, Count, DirtyCount} =
%% Load each segment in turn and filter out messages that are
%% not in the msg_store, by adding acks to the journal. These
%% acks only go to the RAM journal as it doesn't matter if we
%% lose them. Also mark delivered if not clean shutdown. Also
- %% find the number of unacked messages.
+ %% find the number of unacked messages. Also accumulate the
+ %% dirty count here, so we can call maybe_flush_journal below
+ %% and avoid unnecessary file system operations.
lists:foldl(
- fun (Seg, {Segments2, CountAcc}) ->
- Segment = #segment { unacked = UnackedCount } =
+ fun (Seg, {Segments2, CountAcc, DirtyCount}) ->
+ {Segment = #segment { unacked = UnackedCount }, Dirty} =
recover_segment(ContainsCheckFun, CleanShutdown,
segment_find_or_new(Seg, Dir, Segments2)),
- {segment_store(Segment, Segments2), CountAcc + UnackedCount}
- end, {Segments, 0}, all_segment_nums(State1)),
- %% Unconditionally flush since the dirty_count doesn't get updated
- %% by the above foldl.
- State2 = flush_journal(State1 #qistate { segments = Segments1 }),
+ {segment_store(Segment, Segments2),
+ CountAcc + UnackedCount, DirtyCount + Dirty}
+ end, {Segments, 0, 0}, all_segment_nums(State1)),
+ State2 = maybe_flush_journal(State1 #qistate { segments = Segments1,
+ dirty_count = DirtyCount }),
{Count, State2}.
terminate(State = #qistate { journal_handle = JournalHdl,
@@ -467,19 +469,21 @@ recover_segment(ContainsCheckFun, CleanShutdown,
recover_message(ContainsCheckFun(MsgId), CleanShutdown,
Del, RelSeq, Segment1)
end,
- Segment #segment { unacked = UnackedCount + UnackedCountDelta },
+ {Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0},
SegEntries1).
recover_message( true, true, _Del, _RelSeq, Segment) ->
Segment;
recover_message( true, false, del, _RelSeq, Segment) ->
Segment;
-recover_message( true, false, no_del, RelSeq, Segment) ->
- add_to_journal(RelSeq, del, Segment);
-recover_message(false, _, del, RelSeq, Segment) ->
- add_to_journal(RelSeq, ack, Segment);
-recover_message(false, _, no_del, RelSeq, Segment) ->
- add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)).
+recover_message( true, false, no_del, RelSeq, {Segment, DirtyCount}) ->
+ {add_to_journal(RelSeq, del, Segment), DirtyCount + 1};
+recover_message(false, _, del, RelSeq, {Segment, DirtyCount}) ->
+ {add_to_journal(RelSeq, ack, Segment), DirtyCount + 1};
+recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}) ->
+ {add_to_journal(RelSeq, ack,
+ add_to_journal(RelSeq, del, Segment)),
+ DirtyCount + 1}.
queue_name_to_dir_name(Name = #resource { kind = queue }) ->
<<Num:128>> = erlang:md5(term_to_binary(Name)),
@@ -651,9 +655,20 @@ get_journal_handle(State = #qistate { journal_handle = Hdl }) ->
%% if you call it more than once on the same state. Assumes the counts
%% are 0 to start with.
load_journal(State) ->
- {JournalHdl, State1} = get_journal_handle(State),
- {ok, 0} = file_handle_cache:position(JournalHdl, 0),
- load_journal_entries(State1).
+ case is_journal_present(State) of
+ true ->
+ {JournalHdl, State1} = get_journal_handle(State),
+ {ok, 0} = file_handle_cache:position(JournalHdl, 0),
+ load_journal_entries(State1);
+ false ->
+ State
+ end.
+
+is_journal_present(#qistate { journal_handle = undefined,
+ dir = Dir }) ->
+ rabbit_file:is_file(filename:join(Dir, ?JOURNAL_FILENAME));
+is_journal_present(_) ->
+ true.
%% ditto
recover_journal(State) ->