diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-04-19 17:34:28 +0100 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-04-19 17:34:28 +0100 |
commit | f1bf76e40adb74134c113abaddf424ec895ee864 (patch) | |
tree | 851e61d1ebe4eb6fd1f566b5c30279cfb905ebeb | |
parent | 7dc0a3e863acb9f01319de9dddaf7b9d6499523e (diff) | |
download | rabbitmq-server-f1bf76e40adb74134c113abaddf424ec895ee864.tar.gz |
Changed BQ:dropwhile/3 to take a boolean arg to decide wether to ack or not.
I'm most likely going to change this to two different functions, the only thing
that is holding me back is that I'll end up writing one function and then two
wrappers around it in both instances (variable_queue and queue_master).
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 14 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 8 | ||||
-rw-r--r-- | src/rabbit_backing_queue_qc.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 8 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 14 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 34 |
6 files changed, 44 insertions, 36 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2063e557..a370a25e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -698,10 +698,12 @@ drop_expired_messages(State = #q{ttl = undefined}) -> drop_expired_messages(State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> Now = now_micros(), - BQS1 = BQ:dropwhile( - fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, - dead_letter_fun(expired, State), - BQS), + {Msgs, BQS1} = + BQ:dropwhile( + fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, + true, BQS), + DLXFun = dead_letter_fun(expired, State), + lists:foreach(fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs), ensure_ttl_timer(State#q{backing_queue_state = BQS1}). ensure_ttl_timer(State = #q{backing_queue = BQ, @@ -718,7 +720,9 @@ ensure_ttl_timer(State) -> State. dead_letter_fun(_Reason, #q{dlx = undefined}) -> - undefined; + fun(_Msg, _AckTag) -> + ok + end; dead_letter_fun(Reason, _State) -> fun(Msg, AckTag) -> gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason}) diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 6cc1c3fd..f069575f 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -35,6 +35,7 @@ -type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') | 'undefined'). +-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())). %% Called on startup with a list of durable queue names. The queues %% aren't being started at this point, but this call allows the @@ -120,9 +121,10 @@ %% Drop messages from the head of the queue while the supplied %% predicate returns true. A callback function is supplied allowing %% callers access to messages that are about to be dropped. --callback dropwhile(fun ((rabbit_types:message_properties()) -> boolean()), msg_fun(), - state()) - -> state(). +-callback dropwhile(msg_pred(), true, state()) + -> {[{rabbit_types:basic_message(), ack()}], state()}; + (msg_pred(), false, state()) + -> {undefined, state()}. %% Produce the next message. -callback fetch(true, state()) -> {fetch_result(ack()), state()}; diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index 286b69e4..78ababe8 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -141,7 +141,7 @@ qc_drain_confirmed(#state{bqstate = BQ}) -> {call, ?BQMOD, drain_confirmed, [BQ]}. qc_dropwhile(#state{bqstate = BQ}) -> - {call, ?BQMOD, dropwhile, [fun dropfun/1, fun (_,_) -> ok end, BQ]}. + {call, ?BQMOD, dropwhile, [fun dropfun/1, true, BQ]}. qc_is_empty(#state{bqstate = BQ}) -> {call, ?BQMOD, is_empty, [BQ]}. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index e6ef5c57..3afa5b60 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -168,19 +168,19 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 })}. -dropwhile(Pred, MsgFun, +dropwhile(Pred, AckMsgs, State = #state{gm = GM, backing_queue = BQ, set_delivered = SetDelivered, backing_queue_state = BQS }) -> Len = BQ:len(BQS), - BQS1 = BQ:dropwhile(Pred, MsgFun, BQS), + {Msgs, BQS1} = BQ:dropwhile(Pred, AckMsgs, BQS), Len1 = BQ:len(BQS1), ok = gm:broadcast(GM, {set_length, Len1}), Dropped = Len - Len1, SetDelivered1 = lists:max([0, SetDelivered - Dropped]), - State #state { backing_queue_state = BQS1, - set_delivered = SetDelivered1 }. + {Msgs, State #state { backing_queue_state = BQS1, + set_delivered = SetDelivered1 } }. drain_confirmed(State = #state { backing_queue = BQ, backing_queue_state = BQS, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index c74b8d5f..3e85e3d5 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2388,10 +2388,10 @@ test_dropwhile(VQ0) -> fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0), %% drop the first 5 messages - VQ2 = rabbit_variable_queue:dropwhile( - fun(#message_properties { expiry = Expiry }) -> - Expiry =< 5 - end, undefined, VQ1), + {undefined, VQ2} = rabbit_variable_queue:dropwhile( + fun(#message_properties { expiry = Expiry }) -> + Expiry =< 5 + end, false, VQ1), %% fetch five now VQ3 = lists:foldl(fun (_N, VQN) -> @@ -2408,11 +2408,11 @@ test_dropwhile(VQ0) -> test_dropwhile_varying_ram_duration(VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1), - VQ3 = rabbit_variable_queue:dropwhile( - fun(_) -> false end, undefined, VQ2), + {undefined, VQ3} = rabbit_variable_queue:dropwhile( + fun(_) -> false end, false, VQ2), VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), VQ5 = variable_queue_publish(false, 1, VQ4), - rabbit_variable_queue:dropwhile(fun(_) -> false end, undefined, VQ5). + rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5). test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c3462929..c418cc4d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,13 +16,12 @@ -module(rabbit_variable_queue). --export([init/3, terminate/2, delete_and_terminate/2, - purge/1, publish/4, publish_delivered/5, drain_confirmed/1, +-export([init/3, terminate/2, delete_and_terminate/2, purge/1, + publish/4, publish_delivered/5, drain_confirmed/1, dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, - set_ram_duration_target/2, ram_duration/1, - needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2, discard/3, - multiple_routing_keys/0, fold/3]). + set_ram_duration_target/2, ram_duration/1, needs_timeout/1, + timeout/1, handle_pre_hibernate/1, status/1, invoke/3, + is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]). -export([start/1, stop/0]). @@ -579,23 +578,26 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> confirmed = gb_sets:new() }} end. -dropwhile(Pred, MsgFun, State) -> +dropwhile(Pred, AckMsgs, State) -> + End = fun(S) when AckMsgs -> {[], S}; + (S) -> {undefined, S} + end, case queue_out(State) of {empty, State1} -> - a(State1); + End(a(State1)); {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> - case {Pred(MsgProps), MsgFun} of - {true, undefined} -> - {_, State2} = internal_fetch(false, MsgStatus, State1), - dropwhile(Pred, MsgFun, State2); - {true, _} -> + case {Pred(MsgProps), AckMsgs} of + {true, true} -> {MsgStatus1, State2} = read_msg(MsgStatus, State1), {{Msg, _, AckTag, _}, State3} = internal_fetch(true, MsgStatus1, State2), - ok = MsgFun(Msg, AckTag), - dropwhile(Pred, MsgFun, State3); + {L, State4} = dropwhile(Pred, AckMsgs, State3), + {[{Msg, AckTag} | L], State4}; + {true, false} -> + {_, State2} = internal_fetch(false, MsgStatus, State1), + dropwhile(Pred, AckMsgs, State2); {false, _} -> - a(in_r(MsgStatus, State1)) + End(a(in_r(MsgStatus, State1))) end end. |