diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2012-03-06 17:35:24 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2012-03-06 17:35:24 +0000 |
commit | 9db3cf4c401e1e5531c7d09ea42bd5456e96c806 (patch) | |
tree | 90e96a97abde9641b128a29ad8a0ea76e60a8690 | |
parent | 5e4184a673a3043dd82a2f874338852c3dad3127 (diff) | |
download | rabbitmq-server-9db3cf4c401e1e5531c7d09ea42bd5456e96c806.tar.gz |
Add to qi the equivalent of msg_store_file:scan and rework a specialised walker so that it makes use of the generalised one. This is exported so that it can be used elsewhere.
-rw-r--r-- | src/rabbit_queue_index.erl | 50 |
1 files changed, 38 insertions, 12 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 3d07e8b0..3ef769c7 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -21,6 +21,8 @@ publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). +-export([scan/3]). + -export([add_queue_ttl/0]). -define(CLEAN_FILENAME, "clean.dot"). @@ -219,6 +221,12 @@ {non_neg_integer(), non_neg_integer(), qistate()}). -spec(recover/1 :: ([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}). +-spec(scan/3 :: (file:filename(), + fun ((seq_id(), rabbit_types:msg_id(), + rabbit_types:message_properties(), boolean(), + ('del' | 'no_del'), ('ack' | 'no_ack'), A) -> A), + A) -> A). + -spec(add_queue_ttl/0 :: () -> 'ok'). -endif. @@ -378,7 +386,10 @@ all_queue_directory_names(Dir) -> %%---------------------------------------------------------------------------- blank_state(QueueName) -> - Dir = filename:join(queues_dir(), queue_name_to_dir_name(QueueName)), + blank_state_dir( + filename:join(queues_dir(), queue_name_to_dir_name(QueueName))). + +blank_state_dir(Dir) -> {ok, MaxJournal} = application:get_env(rabbit, queue_index_max_journal_entries), #qistate { dir = Dir, @@ -523,19 +534,34 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> end. queue_index_walker_reader(QueueName, Gatherer) -> - State = #qistate { segments = Segments, dir = Dir } = - recover_journal(blank_state(QueueName)), - [ok = segment_entries_foldr( - fun (_RelSeq, {{MsgId, _MsgProps, true}, _IsDelivered, no_ack}, - ok) -> - gatherer:in(Gatherer, {MsgId, 1}); - (_RelSeq, _Value, Acc) -> - Acc - end, ok, segment_find_or_new(Seg, Dir, Segments)) || - Seg <- all_segment_nums(State)], - {_SegmentCounts, _State} = terminate(State), + State = blank_state(QueueName), + ok = scan_segments( + fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) -> + gatherer:in(Gatherer, {MsgId, 1}); + (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered, + _IsAcked, Acc) -> + Acc + end, ok, State), ok = gatherer:finish(Gatherer). +scan(Dir, Fun, Acc) -> + scan_segments(Fun, Acc, blank_state_dir(Dir)). + +scan_segments(Fun, Acc, State) -> + State1 = #qistate { segments = Segments, dir = Dir } = + recover_journal(State), + Result = lists:foldr( + fun (Seg, AccN) -> + segment_entries_foldr( + fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, + IsDelivered, IsAcked}, AccM) -> + Fun(reconstruct_seq_id(Seg, RelSeq), MsgId, MsgProps, + IsPersistent, IsDelivered, IsAcked, AccM) + end, AccN, segment_find_or_new(Seg, Dir, Segments)) + end, Acc, all_segment_nums(State1)), + {_SegmentCounts, _State} = terminate(State1), + Result. + %%---------------------------------------------------------------------------- %% expiry/binary manipulation %%---------------------------------------------------------------------------- |