diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-04-23 16:35:43 +0100 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-04-23 16:35:43 +0100 |
commit | 5e6173aa60ae4468e0dfc8481ae1378f0af5b2eb (patch) | |
tree | 571561b82e6c0e563038e4aaf88113b78d531bce | |
parent | a03f41b8beeca6306fc149894b9ae5680db5b0cf (diff) | |
download | rabbitmq-server-5e6173aa60ae4468e0dfc8481ae1378f0af5b2eb.tar.gz |
Clearer var naming in BQ:dropwhile. Fix in amqqueue_process:drop_expired_messages.
Also, corrected the dropwhile-related comments in rabbit_backing_queue.
For what concerns the fix in drop_expired_messages, I was wrongly generating
acktags for dead messages not to be dead-lettered.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 20 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 4 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 4 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 12 |
4 files changed, 21 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a370a25e..0cf7de40 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -696,14 +696,18 @@ calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000). drop_expired_messages(State = #q{ttl = undefined}) -> State; drop_expired_messages(State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> + backing_queue = BQ }) -> Now = now_micros(), - {Msgs, BQS1} = - BQ:dropwhile( - fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, - true, BQS), DLXFun = dead_letter_fun(expired, State), - lists:foreach(fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs), + ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, + case DLXFun of + undefined -> {undefined, BQS1} = BQ:dropwhile(ExpirePred, false, BQS), + BQS1; + _ -> {Msgs, BQS1} = BQ:dropwhile(ExpirePred, true, BQS), + lists:foreach( + fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs), + BQS1 + end, ensure_ttl_timer(State#q{backing_queue_state = BQS1}). ensure_ttl_timer(State = #q{backing_queue = BQ, @@ -720,9 +724,7 @@ ensure_ttl_timer(State) -> State. dead_letter_fun(_Reason, #q{dlx = undefined}) -> - fun(_Msg, _AckTag) -> - ok - end; + undefined; dead_letter_fun(Reason, _State) -> fun(Msg, AckTag) -> gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason}) diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 150c2551..28c57bb0 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -120,8 +120,8 @@ %% Drop messages from the head of the queue while the supplied predicate returns %% true. Also accepts a boolean parameter that determines whether the messages -%% are to be acked or not. If they are, the messages and the acktags are -%% returned. +%% necessitate an ack or not. If they do, the function returns a list of +%% messages with the respective acktags. -callback dropwhile(msg_pred(), true, state()) -> {[{rabbit_types:basic_message(), ack()}], state()}; (msg_pred(), false, state()) diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 3afa5b60..551fdf18 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -168,13 +168,13 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 })}. -dropwhile(Pred, AckMsgs, +dropwhile(Pred, AckRequired, State = #state{gm = GM, backing_queue = BQ, set_delivered = SetDelivered, backing_queue_state = BQS }) -> Len = BQ:len(BQS), - {Msgs, BQS1} = BQ:dropwhile(Pred, AckMsgs, BQS), + {Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS), Len1 = BQ:len(BQS1), ok = gm:broadcast(GM, {set_length, Len1}), Dropped = Len - Len1, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c418cc4d..ddbe6bcc 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -578,24 +578,24 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> confirmed = gb_sets:new() }} end. -dropwhile(Pred, AckMsgs, State) -> - End = fun(S) when AckMsgs -> {[], S}; - (S) -> {undefined, S} +dropwhile(Pred, AckRequired, State) -> + End = fun(S) when AckRequired -> {[], S}; + (S) -> {undefined, S} end, case queue_out(State) of {empty, State1} -> End(a(State1)); {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> - case {Pred(MsgProps), AckMsgs} of + case {Pred(MsgProps), AckRequired} of {true, true} -> {MsgStatus1, State2} = read_msg(MsgStatus, State1), {{Msg, _, AckTag, _}, State3} = internal_fetch(true, MsgStatus1, State2), - {L, State4} = dropwhile(Pred, AckMsgs, State3), + {L, State4} = dropwhile(Pred, AckRequired, State3), {[{Msg, AckTag} | L], State4}; {true, false} -> {_, State2} = internal_fetch(false, MsgStatus, State1), - dropwhile(Pred, AckMsgs, State2); + dropwhile(Pred, AckRequired, State2); {false, _} -> End(a(in_r(MsgStatus, State1))) end |