summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2012-03-06 17:35:24 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2012-03-06 17:35:24 +0000
commit9db3cf4c401e1e5531c7d09ea42bd5456e96c806 (patch)
tree90e96a97abde9641b128a29ad8a0ea76e60a8690
parent5e4184a673a3043dd82a2f874338852c3dad3127 (diff)
downloadrabbitmq-server-9db3cf4c401e1e5531c7d09ea42bd5456e96c806.tar.gz
Add to qi the equivalent of msg_store_file:scan and rework a specialised walker so that it makes use of the generalised one. This is exported so that it can be used elsewhere.
-rw-r--r--src/rabbit_queue_index.erl50
1 files changed, 38 insertions, 12 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 3d07e8b0..3ef769c7 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -21,6 +21,8 @@
publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
read/3, next_segment_boundary/1, bounds/1, recover/1]).
+-export([scan/3]).
+
-export([add_queue_ttl/0]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -219,6 +221,12 @@
{non_neg_integer(), non_neg_integer(), qistate()}).
-spec(recover/1 :: ([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}).
+-spec(scan/3 :: (file:filename(),
+ fun ((seq_id(), rabbit_types:msg_id(),
+ rabbit_types:message_properties(), boolean(),
+ ('del' | 'no_del'), ('ack' | 'no_ack'), A) -> A),
+ A) -> A).
+
-spec(add_queue_ttl/0 :: () -> 'ok').
-endif.
@@ -378,7 +386,10 @@ all_queue_directory_names(Dir) ->
%%----------------------------------------------------------------------------
blank_state(QueueName) ->
- Dir = filename:join(queues_dir(), queue_name_to_dir_name(QueueName)),
+ blank_state_dir(
+ filename:join(queues_dir(), queue_name_to_dir_name(QueueName))).
+
+blank_state_dir(Dir) ->
{ok, MaxJournal} =
application:get_env(rabbit, queue_index_max_journal_entries),
#qistate { dir = Dir,
@@ -523,19 +534,34 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
end.
queue_index_walker_reader(QueueName, Gatherer) ->
- State = #qistate { segments = Segments, dir = Dir } =
- recover_journal(blank_state(QueueName)),
- [ok = segment_entries_foldr(
- fun (_RelSeq, {{MsgId, _MsgProps, true}, _IsDelivered, no_ack},
- ok) ->
- gatherer:in(Gatherer, {MsgId, 1});
- (_RelSeq, _Value, Acc) ->
- Acc
- end, ok, segment_find_or_new(Seg, Dir, Segments)) ||
- Seg <- all_segment_nums(State)],
- {_SegmentCounts, _State} = terminate(State),
+ State = blank_state(QueueName),
+ ok = scan_segments(
+ fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) ->
+ gatherer:in(Gatherer, {MsgId, 1});
+ (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered,
+ _IsAcked, Acc) ->
+ Acc
+ end, ok, State),
ok = gatherer:finish(Gatherer).
+scan(Dir, Fun, Acc) ->
+ scan_segments(Fun, Acc, blank_state_dir(Dir)).
+
+scan_segments(Fun, Acc, State) ->
+ State1 = #qistate { segments = Segments, dir = Dir } =
+ recover_journal(State),
+ Result = lists:foldr(
+ fun (Seg, AccN) ->
+ segment_entries_foldr(
+ fun (RelSeq, {{MsgId, MsgProps, IsPersistent},
+ IsDelivered, IsAcked}, AccM) ->
+ Fun(reconstruct_seq_id(Seg, RelSeq), MsgId, MsgProps,
+ IsPersistent, IsDelivered, IsAcked, AccM)
+ end, AccN, segment_find_or_new(Seg, Dir, Segments))
+ end, Acc, all_segment_nums(State1)),
+ {_SegmentCounts, _State} = terminate(State1),
+ Result.
+
%%----------------------------------------------------------------------------
%% expiry/binary manipulation
%%----------------------------------------------------------------------------