diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-24 20:33:25 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-24 20:33:25 +0000 |
commit | ed49d7eda7f92189eee162134198189ca80d0e6f (patch) | |
tree | 0d082f26e914f9cef46af5efbb9bc4f4c1ded6ab | |
parent | fe0851e4573f2abe13c847e2f3c3f36c3f307d39 (diff) | |
parent | cb4199ea10659ef67e5b35a97d1a0e4ece5c5dbb (diff) | |
download | rabbitmq-server-ed49d7eda7f92189eee162134198189ca80d0e6f.tar.gz |
merge default into bug25853
containing the recently merged bug25827
-rw-r--r-- | src/rabbit_queue_index.erl | 53 |
1 files changed, 34 insertions, 19 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index b5a316f0..8f2abb55 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -424,22 +424,24 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% and the journal. State1 = #qistate { dir = Dir, segments = Segments } = recover_journal(State), - {Segments1, Count} = + {Segments1, Count, DirtyCount} = %% Load each segment in turn and filter out messages that are %% not in the msg_store, by adding acks to the journal. These %% acks only go to the RAM journal as it doesn't matter if we %% lose them. Also mark delivered if not clean shutdown. Also - %% find the number of unacked messages. + %% find the number of unacked messages. Also accumulate the + %% dirty count here, so we can call maybe_flush_journal below + %% and avoid unnecessary file system operations. lists:foldl( - fun (Seg, {Segments2, CountAcc}) -> - Segment = #segment { unacked = UnackedCount } = + fun (Seg, {Segments2, CountAcc, DirtyCount}) -> + {Segment = #segment { unacked = UnackedCount }, Dirty} = recover_segment(ContainsCheckFun, CleanShutdown, segment_find_or_new(Seg, Dir, Segments2)), - {segment_store(Segment, Segments2), CountAcc + UnackedCount} - end, {Segments, 0}, all_segment_nums(State1)), - %% Unconditionally flush since the dirty_count doesn't get updated - %% by the above foldl. - State2 = flush_journal(State1 #qistate { segments = Segments1 }), + {segment_store(Segment, Segments2), + CountAcc + UnackedCount, DirtyCount + Dirty} + end, {Segments, 0, 0}, all_segment_nums(State1)), + State2 = maybe_flush_journal(State1 #qistate { segments = Segments1, + dirty_count = DirtyCount }), {Count, State2}. terminate(State = #qistate { journal_handle = JournalHdl, @@ -467,19 +469,21 @@ recover_segment(ContainsCheckFun, CleanShutdown, recover_message(ContainsCheckFun(MsgId), CleanShutdown, Del, RelSeq, Segment1) end, - Segment #segment { unacked = UnackedCount + UnackedCountDelta }, + {Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, SegEntries1). recover_message( true, true, _Del, _RelSeq, Segment) -> Segment; recover_message( true, false, del, _RelSeq, Segment) -> Segment; -recover_message( true, false, no_del, RelSeq, Segment) -> - add_to_journal(RelSeq, del, Segment); -recover_message(false, _, del, RelSeq, Segment) -> - add_to_journal(RelSeq, ack, Segment); -recover_message(false, _, no_del, RelSeq, Segment) -> - add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)). +recover_message( true, false, no_del, RelSeq, {Segment, DirtyCount}) -> + {add_to_journal(RelSeq, del, Segment), DirtyCount + 1}; +recover_message(false, _, del, RelSeq, {Segment, DirtyCount}) -> + {add_to_journal(RelSeq, ack, Segment), DirtyCount + 1}; +recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}) -> + {add_to_journal(RelSeq, ack, + add_to_journal(RelSeq, del, Segment)), + DirtyCount + 1}. queue_name_to_dir_name(Name = #resource { kind = queue }) -> <<Num:128>> = erlang:md5(term_to_binary(Name)), @@ -651,9 +655,20 @@ get_journal_handle(State = #qistate { journal_handle = Hdl }) -> %% if you call it more than once on the same state. Assumes the counts %% are 0 to start with. load_journal(State) -> - {JournalHdl, State1} = get_journal_handle(State), - {ok, 0} = file_handle_cache:position(JournalHdl, 0), - load_journal_entries(State1). + case is_journal_present(State) of + true -> + {JournalHdl, State1} = get_journal_handle(State), + {ok, 0} = file_handle_cache:position(JournalHdl, 0), + load_journal_entries(State1); + false -> + State + end. + +is_journal_present(#qistate { journal_handle = undefined, + dir = Dir }) -> + rabbit_file:is_file(filename:join(Dir, ?JOURNAL_FILENAME)); +is_journal_present(_) -> + true. %% ditto recover_journal(State) -> |