diff options
author | Emile Joubert <emile@rabbitmq.com> | 2012-12-04 11:17:25 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2012-12-04 11:17:25 +0000 |
commit | 704bcec128c0f139d536f261ca29c75fa7fba3b7 (patch) | |
tree | a91d82f5da38753b6e5afd52f092a72f513acf41 | |
parent | fb7011f1e3b1487431b0e72031f4d844e53041af (diff) | |
parent | a0cf7e4ab2261a85299deaaa3ae6229795c2df28 (diff) | |
download | rabbitmq-server-704bcec128c0f139d536f261ca29c75fa7fba3b7.tar.gz |
Merged bug25328
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 15 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 32 | ||||
-rw-r--r-- | src/rabbit_backing_queue_qc.erl | 4 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 34 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 66 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 40 |
6 files changed, 123 insertions, 68 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 74717ace..2ffa2a1a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -725,14 +725,15 @@ drop_expired_messages(State = #q{dlx = DLX, Now = now_micros(), ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end, {Props, BQS1} = case DLX of - undefined -> {Next, undefined, BQS2} = - BQ:dropwhile(ExpirePred, false, BQS), - {Next, BQS2}; - _ -> {Next, Msgs, BQS2} = - BQ:dropwhile(ExpirePred, true, BQS), + undefined -> BQ:dropwhile(ExpirePred, BQS); + _ -> {Next, Msgs, BQS2} = + BQ:fetchwhile(ExpirePred, + fun accumulate_msgs/4, + [], BQS), case Msgs of [] -> ok; - _ -> (dead_letter_fun(expired))(Msgs) + _ -> (dead_letter_fun(expired))( + lists:reverse(Msgs)) end, {Next, BQS2} end, @@ -741,6 +742,8 @@ drop_expired_messages(State = #q{dlx = DLX, #message_properties{expiry = Exp} -> Exp end, State#q{backing_queue_state = BQS1}). +accumulate_msgs(Msg, _IsDelivered, AckTag, Acc) -> [{Msg, AckTag} | Acc]. + ensure_ttl_timer(undefined, State) -> State; ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) -> diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 96c58cb9..272df5c1 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -124,16 +124,25 @@ %% be ignored. -callback drain_confirmed(state()) -> {msg_ids(), state()}. -%% Drop messages from the head of the queue while the supplied predicate returns -%% true. Also accepts a boolean parameter that determines whether the messages -%% necessitate an ack or not. If they do, the function returns a list of -%% messages with the respective acktags. --callback dropwhile(msg_pred(), true, state()) - -> {rabbit_types:message_properties() | undefined, - [{rabbit_types:basic_message(), ack()}], state()}; - (msg_pred(), false, state()) - -> {rabbit_types:message_properties() | undefined, - undefined, state()}. +%% Drop messages from the head of the queue while the supplied +%% predicate on message properties returns true. Returns the first +%% message properties for which the predictate returned false, or +%% 'undefined' if the whole backing queue was traversed w/o the +%% predicate ever returning false. +-callback dropwhile(msg_pred(), state()) + -> {rabbit_types:message_properties() | undefined, state()}. + +%% Like dropwhile, except messages are fetched in "require +%% acknowledgement" mode and are passed, together with their Delivered +%% flag and ack tag, to the supplied function. The function is also +%% fed an accumulator. The result of fetchwhile is as for dropwhile +%% plus the accumulator. +-callback fetchwhile(msg_pred(), + fun ((rabbit_types:basic_message(), boolean(), ack(), A) + -> A), + A, state()) + -> {rabbit_types:message_properties() | undefined, + A, state()}. %% Produce the next message. -callback fetch(true, state()) -> {fetch_result(ack()), state()}; @@ -222,7 +231,8 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, {delete_and_terminate, 2}, {purge, 1}, {publish, 5}, - {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3}, + {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, + {dropwhile, 2}, {fetchwhile, 4}, {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index a7e0a5e7..5b3b8aa8 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -147,7 +147,7 @@ qc_drain_confirmed(#state{bqstate = BQ}) -> {call, ?BQMOD, drain_confirmed, [BQ]}. qc_dropwhile(#state{bqstate = BQ}) -> - {call, ?BQMOD, dropwhile, [fun dropfun/1, false, BQ]}. + {call, ?BQMOD, dropwhile, [fun dropfun/1, BQ]}. qc_is_empty(#state{bqstate = BQ}) -> {call, ?BQMOD, is_empty, [BQ]}. @@ -262,7 +262,7 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) -> S#state{bqstate = BQ1}; next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) -> - BQ = {call, erlang, element, [3, Res]}, + BQ = {call, erlang, element, [2, Res]}, #state{messages = Messages} = S, Msgs1 = drop_messages(Messages), S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1}; diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 009971bb..e3d967bc 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -20,7 +20,7 @@ purge/1, publish/5, publish_delivered/4, discard/3, fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1, - dropwhile/3, set_ram_duration_target/2, ram_duration/1, + dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, foreach_ack/3]). @@ -211,19 +211,17 @@ discard(MsgId, ChPid, State = #state { gm = GM, State end. -dropwhile(Pred, AckRequired, - State = #state{gm = GM, - backing_queue = BQ, - backing_queue_state = BQS }) -> +dropwhile(Pred, State = #state{backing_queue = BQ, + backing_queue_state = BQS }) -> Len = BQ:len(BQS), - {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS), - Len1 = BQ:len(BQS1), - Dropped = Len - Len1, - case Dropped of - 0 -> ok; - _ -> ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired}) - end, - {Next, Msgs, State #state { backing_queue_state = BQS1 } }. + {Next, BQS1} = BQ:dropwhile(Pred, BQS), + {Next, drop(Len, false, State #state { backing_queue_state = BQS1 })}. + +fetchwhile(Pred, Fun, Acc, State = #state{backing_queue = BQ, + backing_queue_state = BQS }) -> + Len = BQ:len(BQS), + {Next, Acc1, BQS1} = BQ:fetchwhile(Pred, Fun, Acc, BQS), + {Next, Acc1, drop(Len, true, State #state { backing_queue_state = BQS1 })}. drain_confirmed(State = #state { backing_queue = BQ, backing_queue_state = BQS, @@ -435,6 +433,16 @@ drop_one(AckTag, State = #state { gm = GM, ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}), State. +drop(PrevLen, AckRequired, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + Len = BQ:len(BQS), + case PrevLen - Len of + 0 -> State; + Dropped -> ok = gm:broadcast(GM, {drop, Len, Dropped, AckRequired}), + State + end. + ensure_monitoring(ChPid, State = #state { coordinator = CPid, known_senders = KS }) -> case sets:is_element(ChPid, KS) of diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index df8544a4..2226f445 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2307,8 +2307,9 @@ test_variable_queue() -> fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, fun test_drop/1, fun test_variable_queue_fold_msg_on_disk/1, - fun test_dropwhile/1, + fun test_dropfetchwhile/1, fun test_dropwhile_varying_ram_duration/1, + fun test_fetchwhile_varying_ram_duration/1, fun test_variable_queue_ack_limiting/1, fun test_variable_queue_requeue/1, fun test_variable_queue_fold/1]], @@ -2409,41 +2410,70 @@ test_drop(VQ0) -> true = rabbit_variable_queue:is_empty(VQ5), VQ5. -test_dropwhile(VQ0) -> +test_dropfetchwhile(VQ0) -> Count = 10, %% add messages with sequential expiry VQ1 = variable_queue_publish( false, Count, - fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0), + fun (N, Props) -> Props#message_properties{expiry = N} end, + fun erlang:term_to_binary/1, VQ0), + + %% fetch the first 5 messages + {#message_properties{expiry = 6}, {Msgs, AckTags}, VQ2} = + rabbit_variable_queue:fetchwhile( + fun (#message_properties{expiry = Expiry}) -> Expiry =< 5 end, + fun (Msg, _Delivered, AckTag, {MsgAcc, AckAcc}) -> + {[Msg | MsgAcc], [AckTag | AckAcc]} + end, {[], []}, VQ1), + true = lists:seq(1, 5) == [msg2int(M) || M <- lists:reverse(Msgs)], + + %% requeue them + {_MsgIds, VQ3} = rabbit_variable_queue:requeue(AckTags, VQ2), %% drop the first 5 messages - {_, undefined, VQ2} = rabbit_variable_queue:dropwhile( - fun(#message_properties { expiry = Expiry }) -> - Expiry =< 5 - end, false, VQ1), - - %% fetch five now - VQ3 = lists:foldl(fun (_N, VQN) -> - {{#basic_message{}, _, _}, VQM} = + {#message_properties{expiry = 6}, VQ4} = + rabbit_variable_queue:dropwhile( + fun (#message_properties {expiry = Expiry}) -> Expiry =< 5 end, VQ3), + + %% fetch 5 + VQ5 = lists:foldl(fun (N, VQN) -> + {{Msg, _, _}, VQM} = rabbit_variable_queue:fetch(false, VQN), + true = msg2int(Msg) == N, VQM - end, VQ2, lists:seq(6, Count)), + end, VQ4, lists:seq(6, Count)), %% should be empty now - {empty, VQ4} = rabbit_variable_queue:fetch(false, VQ3), + true = rabbit_variable_queue:is_empty(VQ5), - VQ4. + VQ5. test_dropwhile_varying_ram_duration(VQ0) -> + test_dropfetchwhile_varying_ram_duration( + fun (VQ1) -> + {_, VQ2} = rabbit_variable_queue:dropwhile( + fun (_) -> false end, VQ1), + VQ2 + end, VQ0). + +test_fetchwhile_varying_ram_duration(VQ0) -> + test_dropfetchwhile_varying_ram_duration( + fun (VQ1) -> + {_, ok, VQ2} = rabbit_variable_queue:fetchwhile( + fun (_) -> false end, + fun (_, _, _, A) -> A end, + ok, VQ1), + VQ2 + end, VQ0). + +test_dropfetchwhile_varying_ram_duration(Fun, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1), - {_, undefined, VQ3} = rabbit_variable_queue:dropwhile( - fun(_) -> false end, false, VQ2), + VQ3 = Fun(VQ2), VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), VQ5 = variable_queue_publish(false, 1, VQ4), - {_, undefined, VQ6} = - rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5), + VQ6 = Fun(VQ5), VQ6. test_variable_queue_dynamic_duration_change(VQ0) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 30ab96f5..3e4c7c86 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,7 +18,8 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/5, publish_delivered/4, discard/3, drain_confirmed/1, - dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1, + dropwhile/2, fetchwhile/4, + fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0, foreach_ack/3]). @@ -577,27 +578,30 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> confirmed = gb_sets:new() }} end. -dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []). +dropwhile(Pred, State) -> + case queue_out(State) of + {empty, State1} -> + {undefined, a(State1)}; + {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> + case Pred(MsgProps) of + true -> {_, State2} = internal_fetch(false, MsgStatus, State1), + dropwhile(Pred, State2); + false -> {MsgProps, a(in_r(MsgStatus, State1))} + end + end. -dropwhile(Pred, AckRequired, State, Msgs) -> - End = fun(Next, S) when AckRequired -> {Next, lists:reverse(Msgs), S}; - (Next, S) -> {Next, undefined, S} - end, +fetchwhile(Pred, Fun, Acc, State) -> case queue_out(State) of {empty, State1} -> - End(undefined, a(State1)); + {undefined, Acc, a(State1)}; {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> - case {Pred(MsgProps), AckRequired} of - {true, true} -> - {MsgStatus1, State2} = read_msg(MsgStatus, State1), - {{Msg, _IsDelivered, AckTag}, State3} = - internal_fetch(true, MsgStatus1, State2), - dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]); - {true, false} -> - {_, State2} = internal_fetch(false, MsgStatus, State1), - dropwhile(Pred, AckRequired, State2, undefined); - {false, _} -> - End(MsgProps, a(in_r(MsgStatus, State1))) + case Pred(MsgProps) of + true -> {MsgStatus1, State2} = read_msg(MsgStatus, State1), + {{Msg, IsDelivered, AckTag}, State3} = + internal_fetch(true, MsgStatus1, State2), + Acc1 = Fun(Msg, IsDelivered, AckTag, Acc), + fetchwhile(Pred, Fun, Acc1, State3); + false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))} end end. |