summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-11-21 17:53:18 +0000
committerEmile Joubert <emile@rabbitmq.com>2012-11-21 17:53:18 +0000
commit96ed5762a614081576249c891c4a5b3dfd0719cc (patch)
tree928f82628eebfc0d5cc22985d0da256d23f69fbb
parent4885f41e4f37537f8fcf5c33ccdb964fcdc5bf1a (diff)
downloadrabbitmq-server-96ed5762a614081576249c891c4a5b3dfd0719cc.tar.gz
Minimal backing queue fold
-rw-r--r--src/rabbit_amqqueue_process.erl11
-rw-r--r--src/rabbit_backing_queue.erl7
-rw-r--r--src/rabbit_mirror_queue_master.erl15
-rw-r--r--src/rabbit_variable_queue.erl72
4 files changed, 83 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index abdbd24b..ddffd8be 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1178,7 +1178,12 @@ handle_call(force_event_refresh, _From,
{Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
emit_consumer_created(Ch, CTag, true, AckRequired)
end,
- reply(ok, State).
+ reply(ok, State);
+
+handle_call({fold, Fun, Acc}, _From, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {Acc1, BQS1} = BQ:fold(Fun, Acc, BQS),
+ reply(Acc1, State#q{backing_queue_state = BQS1}).
handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) ->
{MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC),
@@ -1224,8 +1229,8 @@ handle_cast({reject, AckTags, false, ChPid}, State) ->
ChPid, AckTags, State,
fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- BQS1 = BQ:fold(fun(M, A) -> DLXFun([{M, A}]) end,
- BQS, AckTags),
+ BQS1 = BQ:foreach_ack(fun(M, A) -> DLXFun([{M, A}]) end,
+ BQS, AckTags),
State1#q{backing_queue_state = BQS1}
end));
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index af660c60..3f593e4a 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -145,12 +145,15 @@
%% Acktags supplied are for messages which should be processed. The
%% provided callback function is called with each message.
--callback fold(msg_fun(), state(), [ack()]) -> state().
+-callback foreach_ack(msg_fun(), state(), [ack()]) -> state().
%% Reinsert messages into the queue which have already been delivered
%% and were pending acknowledgement.
-callback requeue([ack()], state()) -> {msg_ids(), state()}.
+-callback fold(fun((rabbit_types:basic_message(), any()) -> any()),
+ any(), state()) -> {any(), state()}.
+
%% How long is my queue?
-callback len(state()) -> non_neg_integer().
@@ -212,7 +215,7 @@ behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
{delete_and_terminate, 2}, {purge, 1}, {publish, 4},
{publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3},
- {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
+ {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
{handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index df733546..53d1a173 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -18,10 +18,10 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/4, discard/3, fetch/2, ack/2,
- requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
+ requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/3, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, fold/3]).
+ status/1, invoke/3, is_duplicate/2, foreach_ack/3]).
-export([start/1, stop/0]).
@@ -301,9 +301,9 @@ ack(AckTags, State = #state { gm = GM,
{MsgIds, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 }}.
-fold(MsgFun, State = #state { backing_queue = BQ,
- backing_queue_state = BQS }, AckTags) ->
- State #state { backing_queue_state = BQ:fold(MsgFun, BQS, AckTags) }.
+foreach_ack(MsgFun, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }, AckTags) ->
+ State #state { backing_queue_state = BQ:foreach_ack(MsgFun, BQS, AckTags) }.
requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
@@ -312,6 +312,11 @@ requeue(AckTags, State = #state { gm = GM,
ok = gm:broadcast(GM, {requeue, MsgIds}),
{MsgIds, State #state { backing_queue_state = BQS1 }}.
+fold(Fun, Acc, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Result, BQS1} = BQ:fold(Fun, Acc, BQS),
+ {Result, State #state { backing_queue_state = BQS1 }}.
+
len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:len(BQS).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8a3fd9d9..5a5547ae 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,10 +18,10 @@
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
publish/4, publish_delivered/4, discard/3, drain_confirmed/1,
- dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
+ dropwhile/3, fetch/2, ack/2, requeue/2, fold/3, len/1, is_empty/1,
depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
- is_duplicate/2, multiple_routing_keys/0, fold/3]).
+ is_duplicate/2, multiple_routing_keys/0, foreach_ack/3]).
-export([start/1, stop/0]).
@@ -591,7 +591,7 @@ dropwhile(Pred, AckRequired, State, Msgs) ->
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
case {Pred(MsgProps), AckRequired} of
{true, true} ->
- {MsgStatus1, State2} = read_msg(MsgStatus, State1),
+ {MsgStatus1, State2} = read_msg(MsgStatus, State1, true),
{{Msg, _, AckTag, _}, State3} =
internal_fetch(true, MsgStatus1, State2),
dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]);
@@ -610,7 +610,7 @@ fetch(AckRequired, State) ->
{{value, MsgStatus}, State1} ->
%% it is possible that the message wasn't read from disk
%% at this point, so read it in.
- {MsgStatus1, State2} = read_msg(MsgStatus, State1),
+ {MsgStatus1, State2} = read_msg(MsgStatus, State1, true),
{Res, State3} = internal_fetch(AckRequired, MsgStatus1, State2),
{Res, a(State3)}
end.
@@ -638,13 +638,13 @@ ack(AckTags, State) ->
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) })}.
-fold(undefined, State, _AckTags) ->
+foreach_ack(undefined, State, _AckTags) ->
State;
-fold(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) ->
+foreach_ack(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) ->
lists:foldl(
fun(SeqId, State1) ->
{MsgStatus, State2} =
- read_msg(gb_trees:get(SeqId, PA), State1),
+ read_msg(gb_trees:get(SeqId, PA), State1, true),
MsgFun(MsgStatus#msg_status.msg, SeqId),
State2
end, State, AckTags).
@@ -670,6 +670,53 @@ requeue(AckTags, #vqstate { delta = Delta,
in_counter = InCounter + MsgCount,
len = Len + MsgCount }))}.
+fold(Fun, Acc, #vqstate { q1 = Q1,
+ q2 = Q2,
+ delta = Delta,
+ q3 = Q3,
+ q4 = Q4} = State) ->
+ QFun = fun(M, {A, S}) ->
+ {#msg_status{msg = Msg}, State1} = read_msg(M, S, false),
+ A1 = Fun(Msg, A),
+ {A1, State1}
+ end,
+ {Acc1, State1} = ?QUEUE:foldl(QFun, {Acc, State}, Q4),
+ {Acc2, State2} = ?QUEUE:foldl(QFun, {Acc1, State1}, Q3),
+ {Acc3, State3} = delta_fold (Fun, Acc2, Delta, State2),
+ {Acc4, State4} = ?QUEUE:foldl(QFun, {Acc3, State3}, Q2),
+ ?QUEUE:foldl(QFun, {Acc4, State4}, Q1).
+
+delta_fold(_Fun, Acc, ?BLANK_DELTA_PATTERN(X), State) ->
+ {Acc, State};
+delta_fold(Fun, Acc, #delta { start_seq_id = DeltaSeqId,
+ end_seq_id = DeltaSeqIdEnd}, State) ->
+ {List, State1 = #vqstate { msg_store_clients = MSCState }} =
+ delta_index(DeltaSeqId, DeltaSeqIdEnd, State),
+ {Result, MSCState3} =
+ lists:foldl(fun ({MsgId, _SeqId, _MsgProps, IsPersistent, _IsDelivered},
+ {Acc1, MSCState1}) ->
+ {{ok, Msg = #basic_message {}}, MSCState2} =
+ msg_store_read(MSCState1, IsPersistent, MsgId),
+ {Fun(Msg, Acc1), MSCState2}
+ end, {Acc, MSCState}, List),
+ {Result, State1 #vqstate { msg_store_clients = MSCState3}}.
+
+delta_index(DeltaSeqId, DeltaSeqIdEnd, State) ->
+ delta_index(DeltaSeqId, DeltaSeqIdEnd, State, []).
+
+delta_index(DeltaSeqIdDone, DeltaSeqIdEnd, State, List)
+ when DeltaSeqIdDone == DeltaSeqIdEnd ->
+ {List, State};
+delta_index(DeltaSeqIdDone, DeltaSeqIdEnd,
+ #vqstate { index_state = IndexState } = State, List) ->
+ DeltaSeqId1 = lists:min(
+ [rabbit_queue_index:next_segment_boundary(DeltaSeqIdDone),
+ DeltaSeqIdEnd]),
+ {List1, IndexState1} =
+ rabbit_queue_index:read(DeltaSeqIdDone, DeltaSeqId1, IndexState),
+ delta_index(DeltaSeqId1, DeltaSeqIdEnd,
+ State #vqstate { index_state = IndexState1 }, List ++ List1).
+
len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
@@ -1045,7 +1092,7 @@ in_r(MsgStatus = #msg_status { msg = undefined },
case ?QUEUE:is_empty(Q4) of
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
- read_msg(MsgStatus, State),
+ read_msg(MsgStatus, State, true),
State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
@@ -1066,13 +1113,14 @@ read_msg(MsgStatus = #msg_status { msg = undefined,
msg_id = MsgId,
is_persistent = IsPersistent },
State = #vqstate { ram_msg_count = RamMsgCount,
- msg_store_clients = MSCState}) ->
+ msg_store_clients = MSCState},
+ UpdateRamCount) ->
{{ok, Msg = #basic_message {}}, MSCState1} =
msg_store_read(MSCState, IsPersistent, MsgId),
{MsgStatus #msg_status { msg = Msg },
- State #vqstate { ram_msg_count = RamMsgCount + 1,
+ State #vqstate { ram_msg_count = RamMsgCount + one_if(UpdateRamCount),
msg_store_clients = MSCState1 }};
-read_msg(MsgStatus, State) ->
+read_msg(MsgStatus, State, _UpdateRamCount) ->
{MsgStatus, State}.
internal_fetch(AckRequired, MsgStatus = #msg_status {
@@ -1348,7 +1396,7 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
%%----------------------------------------------------------------------------
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
- read_msg(MsgStatus, State);
+ read_msg(MsgStatus, State, true);
publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) ->
{MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}.