diff options
author | Emile Joubert <emile@rabbitmq.com> | 2012-11-21 17:53:18 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2012-11-21 17:53:18 +0000 |
commit | 96ed5762a614081576249c891c4a5b3dfd0719cc (patch) | |
tree | 928f82628eebfc0d5cc22985d0da256d23f69fbb | |
parent | 4885f41e4f37537f8fcf5c33ccdb964fcdc5bf1a (diff) | |
download | rabbitmq-server-96ed5762a614081576249c891c4a5b3dfd0719cc.tar.gz |
Minimal backing queue fold
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 11 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 7 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 15 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 72 |
4 files changed, 83 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index abdbd24b..ddffd8be 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1178,7 +1178,12 @@ handle_call(force_event_refresh, _From, {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State), emit_consumer_created(Ch, CTag, true, AckRequired) end, - reply(ok, State). + reply(ok, State); + +handle_call({fold, Fun, Acc}, _From, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {Acc1, BQS1} = BQ:fold(Fun, Acc, BQS), + reply(Acc1, State#q{backing_queue_state = BQS1}). handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) -> {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC), @@ -1224,8 +1229,8 @@ handle_cast({reject, AckTags, false, ChPid}, State) -> ChPid, AckTags, State, fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - BQS1 = BQ:fold(fun(M, A) -> DLXFun([{M, A}]) end, - BQS, AckTags), + BQS1 = BQ:foreach_ack(fun(M, A) -> DLXFun([{M, A}]) end, + BQS, AckTags), State1#q{backing_queue_state = BQS1} end)); diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index af660c60..3f593e4a 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -145,12 +145,15 @@ %% Acktags supplied are for messages which should be processed. The %% provided callback function is called with each message. --callback fold(msg_fun(), state(), [ack()]) -> state(). +-callback foreach_ack(msg_fun(), state(), [ack()]) -> state(). %% Reinsert messages into the queue which have already been delivered %% and were pending acknowledgement. -callback requeue([ack()], state()) -> {msg_ids(), state()}. +-callback fold(fun((rabbit_types:basic_message(), any()) -> any()), + any(), state()) -> {any(), state()}. + %% How long is my queue? -callback len(state()) -> non_neg_integer(). @@ -212,7 +215,7 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, {delete_and_terminate, 2}, {purge, 1}, {publish, 4}, {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3}, - {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1}, + {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ; diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index df733546..53d1a173 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -18,10 +18,10 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/4, discard/3, fetch/2, ack/2, - requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1, + requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2, fold/3]). + status/1, invoke/3, is_duplicate/2, foreach_ack/3]). -export([start/1, stop/0]). @@ -301,9 +301,9 @@ ack(AckTags, State = #state { gm = GM, {MsgIds, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }}. -fold(MsgFun, State = #state { backing_queue = BQ, - backing_queue_state = BQS }, AckTags) -> - State #state { backing_queue_state = BQ:fold(MsgFun, BQS, AckTags) }. +foreach_ack(MsgFun, State = #state { backing_queue = BQ, + backing_queue_state = BQS }, AckTags) -> + State #state { backing_queue_state = BQ:foreach_ack(MsgFun, BQS, AckTags) }. requeue(AckTags, State = #state { gm = GM, backing_queue = BQ, @@ -312,6 +312,11 @@ requeue(AckTags, State = #state { gm = GM, ok = gm:broadcast(GM, {requeue, MsgIds}), {MsgIds, State #state { backing_queue_state = BQS1 }}. +fold(Fun, Acc, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {Result, BQS1} = BQ:fold(Fun, Acc, BQS), + {Result, State #state { backing_queue_state = BQS1 }}. + len(#state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:len(BQS). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8a3fd9d9..5a5547ae 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,10 +18,10 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/4, discard/3, drain_confirmed/1, - dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, + dropwhile/3, fetch/2, ack/2, requeue/2, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, - is_duplicate/2, multiple_routing_keys/0, fold/3]). + is_duplicate/2, multiple_routing_keys/0, foreach_ack/3]). -export([start/1, stop/0]). @@ -591,7 +591,7 @@ dropwhile(Pred, AckRequired, State, Msgs) -> {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> case {Pred(MsgProps), AckRequired} of {true, true} -> - {MsgStatus1, State2} = read_msg(MsgStatus, State1), + {MsgStatus1, State2} = read_msg(MsgStatus, State1, true), {{Msg, _, AckTag, _}, State3} = internal_fetch(true, MsgStatus1, State2), dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]); @@ -610,7 +610,7 @@ fetch(AckRequired, State) -> {{value, MsgStatus}, State1} -> %% it is possible that the message wasn't read from disk %% at this point, so read it in. - {MsgStatus1, State2} = read_msg(MsgStatus, State1), + {MsgStatus1, State2} = read_msg(MsgStatus, State1, true), {Res, State3} = internal_fetch(AckRequired, MsgStatus1, State2), {Res, a(State3)} end. @@ -638,13 +638,13 @@ ack(AckTags, State) -> persistent_count = PCount1, ack_out_counter = AckOutCount + length(AckTags) })}. -fold(undefined, State, _AckTags) -> +foreach_ack(undefined, State, _AckTags) -> State; -fold(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) -> +foreach_ack(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) -> lists:foldl( fun(SeqId, State1) -> {MsgStatus, State2} = - read_msg(gb_trees:get(SeqId, PA), State1), + read_msg(gb_trees:get(SeqId, PA), State1, true), MsgFun(MsgStatus#msg_status.msg, SeqId), State2 end, State, AckTags). @@ -670,6 +670,53 @@ requeue(AckTags, #vqstate { delta = Delta, in_counter = InCounter + MsgCount, len = Len + MsgCount }))}. +fold(Fun, Acc, #vqstate { q1 = Q1, + q2 = Q2, + delta = Delta, + q3 = Q3, + q4 = Q4} = State) -> + QFun = fun(M, {A, S}) -> + {#msg_status{msg = Msg}, State1} = read_msg(M, S, false), + A1 = Fun(Msg, A), + {A1, State1} + end, + {Acc1, State1} = ?QUEUE:foldl(QFun, {Acc, State}, Q4), + {Acc2, State2} = ?QUEUE:foldl(QFun, {Acc1, State1}, Q3), + {Acc3, State3} = delta_fold (Fun, Acc2, Delta, State2), + {Acc4, State4} = ?QUEUE:foldl(QFun, {Acc3, State3}, Q2), + ?QUEUE:foldl(QFun, {Acc4, State4}, Q1). + +delta_fold(_Fun, Acc, ?BLANK_DELTA_PATTERN(X), State) -> + {Acc, State}; +delta_fold(Fun, Acc, #delta { start_seq_id = DeltaSeqId, + end_seq_id = DeltaSeqIdEnd}, State) -> + {List, State1 = #vqstate { msg_store_clients = MSCState }} = + delta_index(DeltaSeqId, DeltaSeqIdEnd, State), + {Result, MSCState3} = + lists:foldl(fun ({MsgId, _SeqId, _MsgProps, IsPersistent, _IsDelivered}, + {Acc1, MSCState1}) -> + {{ok, Msg = #basic_message {}}, MSCState2} = + msg_store_read(MSCState1, IsPersistent, MsgId), + {Fun(Msg, Acc1), MSCState2} + end, {Acc, MSCState}, List), + {Result, State1 #vqstate { msg_store_clients = MSCState3}}. + +delta_index(DeltaSeqId, DeltaSeqIdEnd, State) -> + delta_index(DeltaSeqId, DeltaSeqIdEnd, State, []). + +delta_index(DeltaSeqIdDone, DeltaSeqIdEnd, State, List) + when DeltaSeqIdDone == DeltaSeqIdEnd -> + {List, State}; +delta_index(DeltaSeqIdDone, DeltaSeqIdEnd, + #vqstate { index_state = IndexState } = State, List) -> + DeltaSeqId1 = lists:min( + [rabbit_queue_index:next_segment_boundary(DeltaSeqIdDone), + DeltaSeqIdEnd]), + {List1, IndexState1} = + rabbit_queue_index:read(DeltaSeqIdDone, DeltaSeqId1, IndexState), + delta_index(DeltaSeqId1, DeltaSeqIdEnd, + State #vqstate { index_state = IndexState1 }, List ++ List1). + len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). @@ -1045,7 +1092,7 @@ in_r(MsgStatus = #msg_status { msg = undefined }, case ?QUEUE:is_empty(Q4) of true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} = - read_msg(MsgStatus, State), + read_msg(MsgStatus, State, true), State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) } end; in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> @@ -1066,13 +1113,14 @@ read_msg(MsgStatus = #msg_status { msg = undefined, msg_id = MsgId, is_persistent = IsPersistent }, State = #vqstate { ram_msg_count = RamMsgCount, - msg_store_clients = MSCState}) -> + msg_store_clients = MSCState}, + UpdateRamCount) -> {{ok, Msg = #basic_message {}}, MSCState1} = msg_store_read(MSCState, IsPersistent, MsgId), {MsgStatus #msg_status { msg = Msg }, - State #vqstate { ram_msg_count = RamMsgCount + 1, + State #vqstate { ram_msg_count = RamMsgCount + one_if(UpdateRamCount), msg_store_clients = MSCState1 }}; -read_msg(MsgStatus, State) -> +read_msg(MsgStatus, State, _UpdateRamCount) -> {MsgStatus, State}. internal_fetch(AckRequired, MsgStatus = #msg_status { @@ -1348,7 +1396,7 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> %%---------------------------------------------------------------------------- publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> - read_msg(MsgStatus, State); + read_msg(MsgStatus, State, true); publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) -> {MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}. |