diff options
author | Matthias Radestock <matthias@lshift.net> | 2010-05-24 13:54:20 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2010-05-24 13:54:20 +0100 |
commit | 99d2c345265cfb4a8301d13d4c96af05931252ef (patch) | |
tree | 70d9506b6c85e2e3c60dc4dde2e1c483607c7ff2 /src/rabbit_queue_index.erl | |
parent | aab4e202071f7433c77c3304f17f429abfe6c04d (diff) | |
parent | 7922f7d7aeaf68d5b809e7f93b6f37243354570e (diff) | |
download | rabbitmq-server-99d2c345265cfb4a8301d13d4c96af05931252ef.tar.gz |
merge heads
Diffstat (limited to 'src/rabbit_queue_index.erl')
-rw-r--r-- | src/rabbit_queue_index.erl | 53 |
1 files changed, 25 insertions, 28 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 93dce89e..02d0d8ad 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,7 +31,7 @@ -module(rabbit_queue_index). --export([init/3, terminate/2, terminate_and_erase/1, publish/4, +-export([init/3, terminate/2, delete_and_terminate/1, publish/4, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). @@ -188,7 +188,7 @@ -spec(init/3 :: (queue_name(), boolean(), fun ((guid()) -> boolean())) -> {'undefined' | non_neg_integer(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). --spec(terminate_and_erase/1 :: (qistate()) -> qistate()). +-spec(delete_and_terminate/1 :: (qistate()) -> qistate()). -spec(publish/4 :: (guid(), seq_id(), boolean(), qistate()) -> qistate()). -spec(deliver/2 :: (seq_id(), qistate()) -> qistate()). -spec(ack/2 :: ([seq_id()], qistate()) -> qistate()). @@ -225,11 +225,13 @@ init(Name, MsgStoreRecovered, ContainsCheckFun) -> {Count, Terms, State1}. terminate(Terms, State) -> - terminate(true, Terms, State). + {SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State), + store_clean_shutdown([{segments, SegmentCounts} | Terms], Dir), + State1. -terminate_and_erase(State) -> - State1 = terminate(false, [], State), - ok = rabbit_misc:recursive_delete([State1 #qistate.dir]), +delete_and_terminate(State) -> + {_SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State), + ok = rabbit_misc:recursive_delete([Dir]), State1. publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) -> @@ -406,7 +408,7 @@ init_clean(RecoveredCounts, State) -> lists:foldl( fun ({Seg, UnackedCount}, SegmentsN) -> Segment = segment_find_or_new(Seg, Dir, SegmentsN), - segment_store(Segment #segment {unacked = UnackedCount }, + segment_store(Segment #segment { unacked = UnackedCount }, SegmentsN) end, Segments, RecoveredCounts), %% the counts above include transient messages, which would be the @@ -415,7 +417,7 @@ init_clean(RecoveredCounts, State) -> init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% Recover the journal completely. This will also load segments - %% which have entries in the journal and remove duplicates. The + %% which have entries in the journal and remove duplicates. The %% counts will correctly reflect the combination of the segment %% and the journal. State1 = #qistate { dir = Dir, segments = Segments } = @@ -438,29 +440,24 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) -> State2 = flush_journal(State1 #qistate { segments = Segments1 }), {Count, State2}. -terminate(_StoreShutdown, _Terms, State = #qistate { segments = undefined }) -> - State; -terminate(StoreShutdown, Terms, State = - #qistate { journal_handle = JournalHdl, - dir = Dir, segments = Segments }) -> +terminate(State = #qistate { journal_handle = JournalHdl, + segments = Segments }) -> ok = case JournalHdl of undefined -> ok; _ -> file_handle_cache:close(JournalHdl) end, - SegTerms = segment_fold( - fun (Seg, #segment { handle = Hdl, - unacked = UnackedCount }, SegTermsAcc) -> - ok = case Hdl of - undefined -> ok; - _ -> file_handle_cache:close(Hdl) - end, - [{Seg, UnackedCount} | SegTermsAcc] - end, [], Segments), - case StoreShutdown of - true -> store_clean_shutdown([{segments, SegTerms} | Terms], Dir); - false -> ok - end, - State #qistate { journal_handle = undefined, segments = undefined }. + SegmentCounts = + segment_fold( + fun (Seg, #segment { handle = Hdl, unacked = UnackedCount }, + SegmentCountsAcc) -> + ok = case Hdl of + undefined -> ok; + _ -> file_handle_cache:close(Hdl) + end, + [{Seg, UnackedCount} | SegmentCountsAcc] + end, [], Segments), + {SegmentCounts, State #qistate { journal_handle = undefined, + segments = undefined }}. recover_segment(ContainsCheckFun, CleanShutdown, Segment) -> {SegEntries, UnackedCount, Segment1} = load_segment(false, Segment), @@ -530,7 +527,7 @@ queue_index_walker_reader(QueueName, Gatherer) -> {Guid, _SeqId, true, _IsDelivered} <- Messages], State3 end, State, all_segment_nums(State)), - _State = terminate(false, [], State1), + {_SegmentCounts, _State} = terminate(State1), ok = gatherer:finish(Gatherer). %%---------------------------------------------------------------------------- |