summaryrefslogtreecommitdiff
path: root/src/rabbit_queue_index.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-05-24 13:54:20 +0100
committerMatthias Radestock <matthias@lshift.net>2010-05-24 13:54:20 +0100
commit99d2c345265cfb4a8301d13d4c96af05931252ef (patch)
tree70d9506b6c85e2e3c60dc4dde2e1c483607c7ff2 /src/rabbit_queue_index.erl
parentaab4e202071f7433c77c3304f17f429abfe6c04d (diff)
parent7922f7d7aeaf68d5b809e7f93b6f37243354570e (diff)
downloadrabbitmq-server-99d2c345265cfb4a8301d13d4c96af05931252ef.tar.gz
merge heads
Diffstat (limited to 'src/rabbit_queue_index.erl')
-rw-r--r--src/rabbit_queue_index.erl53
1 files changed, 25 insertions, 28 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 93dce89e..02d0d8ad 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -31,7 +31,7 @@
-module(rabbit_queue_index).
--export([init/3, terminate/2, terminate_and_erase/1, publish/4,
+-export([init/3, terminate/2, delete_and_terminate/1, publish/4,
deliver/2, ack/2, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
@@ -188,7 +188,7 @@
-spec(init/3 :: (queue_name(), boolean(), fun ((guid()) -> boolean())) ->
{'undefined' | non_neg_integer(), [any()], qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
--spec(terminate_and_erase/1 :: (qistate()) -> qistate()).
+-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
-spec(publish/4 :: (guid(), seq_id(), boolean(), qistate()) -> qistate()).
-spec(deliver/2 :: (seq_id(), qistate()) -> qistate()).
-spec(ack/2 :: ([seq_id()], qistate()) -> qistate()).
@@ -225,11 +225,13 @@ init(Name, MsgStoreRecovered, ContainsCheckFun) ->
{Count, Terms, State1}.
terminate(Terms, State) ->
- terminate(true, Terms, State).
+ {SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State),
+ store_clean_shutdown([{segments, SegmentCounts} | Terms], Dir),
+ State1.
-terminate_and_erase(State) ->
- State1 = terminate(false, [], State),
- ok = rabbit_misc:recursive_delete([State1 #qistate.dir]),
+delete_and_terminate(State) ->
+ {_SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State),
+ ok = rabbit_misc:recursive_delete([Dir]),
State1.
publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) ->
@@ -406,7 +408,7 @@ init_clean(RecoveredCounts, State) ->
lists:foldl(
fun ({Seg, UnackedCount}, SegmentsN) ->
Segment = segment_find_or_new(Seg, Dir, SegmentsN),
- segment_store(Segment #segment {unacked = UnackedCount },
+ segment_store(Segment #segment { unacked = UnackedCount },
SegmentsN)
end, Segments, RecoveredCounts),
%% the counts above include transient messages, which would be the
@@ -415,7 +417,7 @@ init_clean(RecoveredCounts, State) ->
init_dirty(CleanShutdown, ContainsCheckFun, State) ->
%% Recover the journal completely. This will also load segments
- %% which have entries in the journal and remove duplicates. The
+ %% which have entries in the journal and remove duplicates. The
%% counts will correctly reflect the combination of the segment
%% and the journal.
State1 = #qistate { dir = Dir, segments = Segments } =
@@ -438,29 +440,24 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) ->
State2 = flush_journal(State1 #qistate { segments = Segments1 }),
{Count, State2}.
-terminate(_StoreShutdown, _Terms, State = #qistate { segments = undefined }) ->
- State;
-terminate(StoreShutdown, Terms, State =
- #qistate { journal_handle = JournalHdl,
- dir = Dir, segments = Segments }) ->
+terminate(State = #qistate { journal_handle = JournalHdl,
+ segments = Segments }) ->
ok = case JournalHdl of
undefined -> ok;
_ -> file_handle_cache:close(JournalHdl)
end,
- SegTerms = segment_fold(
- fun (Seg, #segment { handle = Hdl,
- unacked = UnackedCount }, SegTermsAcc) ->
- ok = case Hdl of
- undefined -> ok;
- _ -> file_handle_cache:close(Hdl)
- end,
- [{Seg, UnackedCount} | SegTermsAcc]
- end, [], Segments),
- case StoreShutdown of
- true -> store_clean_shutdown([{segments, SegTerms} | Terms], Dir);
- false -> ok
- end,
- State #qistate { journal_handle = undefined, segments = undefined }.
+ SegmentCounts =
+ segment_fold(
+ fun (Seg, #segment { handle = Hdl, unacked = UnackedCount },
+ SegmentCountsAcc) ->
+ ok = case Hdl of
+ undefined -> ok;
+ _ -> file_handle_cache:close(Hdl)
+ end,
+ [{Seg, UnackedCount} | SegmentCountsAcc]
+ end, [], Segments),
+ {SegmentCounts, State #qistate { journal_handle = undefined,
+ segments = undefined }}.
recover_segment(ContainsCheckFun, CleanShutdown, Segment) ->
{SegEntries, UnackedCount, Segment1} = load_segment(false, Segment),
@@ -530,7 +527,7 @@ queue_index_walker_reader(QueueName, Gatherer) ->
{Guid, _SeqId, true, _IsDelivered} <- Messages],
State3
end, State, all_segment_nums(State)),
- _State = terminate(false, [], State1),
+ {_SegmentCounts, _State} = terminate(State1),
ok = gatherer:finish(Gatherer).
%%----------------------------------------------------------------------------