diff options
author | Emile Joubert <emile@rabbitmq.com> | 2013-01-02 12:12:55 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2013-01-02 12:12:55 +0000 |
commit | 19e7e08848518c3432fa640fd56eb82119a8d5b3 (patch) | |
tree | 6e028ac36ef89fa43647919e0c44ec039914eeb1 | |
parent | a545a3b732e9f20ab860d0b25d99c704d5770dd4 (diff) | |
parent | 0c603bc18f3276b7e1a72712f63f32aeb6de35e0 (diff) | |
download | rabbitmq-server-19e7e08848518c3432fa640fd56eb82119a8d5b3.tar.gz |
Merged bug25372
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 9 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 26 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 15 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 8 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 29 |
5 files changed, 42 insertions, 45 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ce3b4ce8..f9614517 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -720,7 +720,7 @@ drop_expired_msgs(State = #q{dlx = DLX, undefined -> BQ:dropwhile(ExpirePred, BQS); _ -> {Next, Msgs, BQS2} = BQ:fetchwhile(ExpirePred, - fun accumulate_msgs/4, + fun accumulate_msgs/3, [], BQS), case Msgs of [] -> ok; @@ -734,7 +734,7 @@ drop_expired_msgs(State = #q{dlx = DLX, #message_properties{expiry = Exp} -> Exp end, State#q{backing_queue_state = BQS1}). -accumulate_msgs(Msg, _IsDelivered, AckTag, Acc) -> [{Msg, AckTag} | Acc]. +accumulate_msgs(Msg, AckTag, Acc) -> [{Msg, AckTag} | Acc]. ensure_ttl_timer(undefined, State) -> State; @@ -1202,8 +1202,9 @@ handle_cast({reject, AckTags, false, ChPid}, State) -> ChPid, AckTags, State, fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - BQS1 = BQ:foreach_ack(fun(M, A) -> DLXFun([{M, A}]) end, - BQS, AckTags), + {ok, BQS1} = BQ:ackfold( + fun (M, A, ok) -> DLXFun([{M, A}]) end, + ok, BQS, AckTags), State1#q{backing_queue_state = BQS1} end)); diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 272df5c1..99b5946e 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -35,8 +35,7 @@ fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). -type(duration() :: ('undefined' | 'infinity' | number())). --type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') | - 'undefined'). +-type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)). -type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())). %% Called on startup with a list of durable queue names. The queues @@ -133,14 +132,11 @@ -> {rabbit_types:message_properties() | undefined, state()}. %% Like dropwhile, except messages are fetched in "require -%% acknowledgement" mode and are passed, together with their Delivered -%% flag and ack tag, to the supplied function. The function is also -%% fed an accumulator. The result of fetchwhile is as for dropwhile -%% plus the accumulator. --callback fetchwhile(msg_pred(), - fun ((rabbit_types:basic_message(), boolean(), ack(), A) - -> A), - A, state()) +%% acknowledgement" mode and are passed, together with their ack tag, +%% to the supplied function. The function is also fed an +%% accumulator. The result of fetchwhile is as for dropwhile plus the +%% accumulator. +-callback fetchwhile(msg_pred(), msg_fun(A), A, state()) -> {rabbit_types:message_properties() | undefined, A, state()}. @@ -156,14 +152,14 @@ %% about. Must return 1 msg_id per Ack, in the same order as Acks. -callback ack([ack()], state()) -> {msg_ids(), state()}. -%% Acktags supplied are for messages which should be processed. The -%% provided callback function is called with each message. --callback foreach_ack(msg_fun(), state(), [ack()]) -> state(). - %% Reinsert messages into the queue which have already been delivered %% and were pending acknowledgement. -callback requeue([ack()], state()) -> {msg_ids(), state()}. +%% Fold over messages by ack tag. The supplied function is called with +%% each message, its ack tag, and an accumulator. +-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}. + %% Fold over all the messages in a queue and return the accumulated %% results, leaving the queue undisturbed. -callback fold(fun((rabbit_types:basic_message(), @@ -233,7 +229,7 @@ behaviour_info(callbacks) -> {delete_and_terminate, 2}, {purge, 1}, {publish, 5}, {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 2}, {fetchwhile, 4}, - {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {fold, 3}, {len, 1}, + {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ; diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index e3d967bc..e857f395 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -18,11 +18,11 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/5, publish_delivered/4, - discard/3, fetch/2, drop/2, ack/2, - requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1, + discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, + len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2, foreach_ack/3]). + status/1, invoke/3, is_duplicate/2]). -export([start/1, stop/0]). @@ -281,10 +281,6 @@ ack(AckTags, State = #state { gm = GM, end, {MsgIds, State #state { backing_queue_state = BQS1 }}. -foreach_ack(MsgFun, State = #state { backing_queue = BQ, - backing_queue_state = BQS }, AckTags) -> - State #state { backing_queue_state = BQ:foreach_ack(MsgFun, BQS, AckTags) }. - requeue(AckTags, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> @@ -292,6 +288,11 @@ requeue(AckTags, State = #state { gm = GM, ok = gm:broadcast(GM, {requeue, MsgIds}), {MsgIds, State #state { backing_queue_state = BQS1 }}. +ackfold(MsgFun, Acc, State = #state { backing_queue = BQ, + backing_queue_state = BQS }, AckTags) -> + {Acc1, BQS1} = BQ:ackfold(MsgFun, Acc, BQS, AckTags), + {Acc1, State #state { backing_queue_state = BQS1 }}. + fold(Fun, Acc, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> {Result, BQS1} = BQ:fold(Fun, Acc, BQS), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b499c59b..09ed3d08 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2434,7 +2434,7 @@ test_dropfetchwhile(VQ0) -> {#message_properties{expiry = 6}, {Msgs, AckTags}, VQ2} = rabbit_variable_queue:fetchwhile( fun (#message_properties{expiry = Expiry}) -> Expiry =< 5 end, - fun (Msg, _Delivered, AckTag, {MsgAcc, AckAcc}) -> + fun (Msg, AckTag, {MsgAcc, AckAcc}) -> {[Msg | MsgAcc], [AckTag | AckAcc]} end, {[], []}, VQ1), true = lists:seq(1, 5) == [msg2int(M) || M <- lists:reverse(Msgs)], @@ -2473,7 +2473,7 @@ test_fetchwhile_varying_ram_duration(VQ0) -> fun (VQ1) -> {_, ok, VQ2} = rabbit_variable_queue:fetchwhile( fun (_) -> false end, - fun (_, _, _, A) -> A end, + fun (_, _, A) -> A end, ok, VQ1), VQ2 end, VQ0). @@ -2608,8 +2608,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> test_variable_queue_fold_msg_on_disk(VQ0) -> VQ1 = variable_queue_publish(true, 1, VQ0), {VQ2, AckTags} = variable_queue_fetch(1, true, false, 1, VQ1), - VQ3 = rabbit_variable_queue:foreach_ack(fun (_M, _A) -> ok end, - VQ2, AckTags), + {ok, VQ3} = rabbit_variable_queue:ackfold(fun (_M, _A, ok) -> ok end, + ok, VQ2, AckTags), VQ3. test_queue_recover() -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8e23b591..eabfe136 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -19,10 +19,10 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/5, publish_delivered/4, discard/3, drain_confirmed/1, dropwhile/2, fetchwhile/4, - fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1, + fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, - is_duplicate/2, multiple_routing_keys/0, foreach_ack/3]). + is_duplicate/2, multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -597,10 +597,9 @@ fetchwhile(Pred, Fun, Acc, State) -> {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> case Pred(MsgProps) of true -> {MsgStatus1, State2} = read_msg(MsgStatus, State1), - {{Msg, IsDelivered, AckTag}, State3} = + {{Msg, _IsDelivered, AckTag}, State3} = remove(true, MsgStatus1, State2), - Acc1 = Fun(Msg, IsDelivered, AckTag, Acc), - fetchwhile(Pred, Fun, Acc1, State3); + fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3); false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))} end end. @@ -650,16 +649,6 @@ ack(AckTags, State) -> persistent_count = PCount1, ack_out_counter = AckOutCount + length(AckTags) })}. -foreach_ack(undefined, State, _AckTags) -> - State; -foreach_ack(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) -> - a(lists:foldl(fun(SeqId, State1) -> - {MsgStatus, State2} = - read_msg(gb_trees:get(SeqId, PA), false, State1), - MsgFun(MsgStatus#msg_status.msg, SeqId), - State2 - end, State, AckTags)). - requeue(AckTags, #vqstate { delta = Delta, q3 = Q3, q4 = Q4, @@ -681,6 +670,16 @@ requeue(AckTags, #vqstate { delta = Delta, in_counter = InCounter + MsgCount, len = Len + MsgCount }))}. +ackfold(MsgFun, Acc, State, AckTags) -> + {AccN, StateN} = + lists:foldl( + fun(SeqId, {Acc0, State0 = #vqstate{ pending_ack = PA }}) -> + {#msg_status { msg = Msg }, State1} = + read_msg(gb_trees:get(SeqId, PA), false, State0), + {MsgFun(Msg, SeqId, Acc0), State1} + end, {Acc, State}, AckTags), + {AccN, a(StateN)}. + fold(Fun, Acc, #vqstate { q1 = Q1, q2 = Q2, delta = #delta { start_seq_id = DeltaSeqId, |