diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-08-12 13:39:29 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-08-12 13:39:29 +0100 |
commit | 35dc9ebfc09e93d1095fac179e46313c61207a76 (patch) | |
tree | 1a55a30e92e935dcafe72a44c9cbcbfcfbc51b8d | |
parent | 35eb6674519def7bda973495682d85837e866d7f (diff) | |
parent | 010eadbc45a67545573069288d2840bdcd74dbb2 (diff) | |
download | rabbitmq-server-35dc9ebfc09e93d1095fac179e46313c61207a76.tar.gz |
merge default into bug 25097
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 47 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 6 | ||||
-rw-r--r-- | src/rabbit_backing_queue_qc.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 6 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 14 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 8 |
6 files changed, 47 insertions, 36 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6bf290de..0250902f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -576,7 +576,8 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, maybe_record_confirm_message(Confirm, State1), Props = message_properties(Confirm, State2), BQS1 = BQ:publish(Message, Props, SenderPid, BQS), - ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) + ensure_ttl_timer(Props#message_properties.expiry, + State2#q{backing_queue_state = BQS1}) end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> @@ -716,30 +717,38 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, backing_queue = BQ }) -> Now = now_micros(), DLXFun = dead_letter_fun(expired, State), - ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, - BQS1 = case DLXFun of - undefined -> {undefined, BQS2} = - BQ:dropwhile(ExpirePred, false, BQS), - BQS2; - _ -> {Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS), - lists:foreach( - fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, + ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end, + {Props, BQS1} = + case DLXFun of + undefined -> + {Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS), + {Next, BQS2}; + _ -> + {Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS), + lists:foreach(fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs), - BQS2 - end, - ensure_ttl_timer(State#q{backing_queue_state = BQS1}). - -ensure_ttl_timer(State = #q{backing_queue = BQ, - backing_queue_state = BQS, - ttl = TTL, - ttl_timer_ref = undefined}) + {Next, BQS2} + end, + ensure_ttl_timer(case Props of + undefined -> undefined; + #message_properties{expiry = Exp} -> Exp + end, State#q{backing_queue_state = BQS1}). + +ensure_ttl_timer(Expiry, State = #q{backing_queue = BQ, + backing_queue_state = BQS, + ttl = TTL, + ttl_timer_ref = undefined}) when TTL =/= undefined -> case BQ:is_empty(BQS) of true -> State; - false -> TRef = erlang:send_after(TTL, self(), drop_expired), + false -> After = (case Expiry - now_micros() of + V when V > 0 -> V + 999; %% always fire later + _ -> 0 + end) div 1000, + TRef = erlang:send_after(After, self(), drop_expired), State#q{ttl_timer_ref = TRef} end; -ensure_ttl_timer(State) -> +ensure_ttl_timer(_Expiry, State) -> State. ack_if_no_dlx(AckTags, State = #q{dlx = undefined, diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 95523bed..ed5340fe 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -124,9 +124,11 @@ %% necessitate an ack or not. If they do, the function returns a list of %% messages with the respective acktags. -callback dropwhile(msg_pred(), true, state()) - -> {[{rabbit_types:basic_message(), ack()}], state()}; + -> {rabbit_types:message_properties() | undefined, + [{rabbit_types:basic_message(), ack()}], state()}; (msg_pred(), false, state()) - -> {undefined, state()}. + -> {rabbit_types:message_properties() | undefined, + undefined, state()}. %% Produce the next message. -callback fetch(true, state()) -> {fetch_result(ack()), state()}; diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index a84800c0..e40d9b29 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -268,7 +268,7 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) -> S#state{bqstate = BQ1}; next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) -> - BQ = {call, erlang, element, [2, Res]}, + BQ = {call, erlang, element, [3, Res]}, #state{messages = Messages} = S, Msgs1 = drop_messages(Messages), S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1}; diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 750bcd56..477449e3 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -185,13 +185,13 @@ dropwhile(Pred, AckRequired, set_delivered = SetDelivered, backing_queue_state = BQS }) -> Len = BQ:len(BQS), - {Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS), + {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS), Len1 = BQ:len(BQS1), ok = gm:broadcast(GM, {set_length, Len1, AckRequired}), Dropped = Len - Len1, SetDelivered1 = lists:max([0, SetDelivered - Dropped]), - {Msgs, State #state { backing_queue_state = BQS1, - set_delivered = SetDelivered1 } }. + {Next, Msgs, State #state { backing_queue_state = BQS1, + set_delivered = SetDelivered1 } }. drain_confirmed(State = #state { backing_queue = BQ, backing_queue_state = BQS, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 21bd02af..a0699a48 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2455,10 +2455,10 @@ test_dropwhile(VQ0) -> fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0), %% drop the first 5 messages - {undefined, VQ2} = rabbit_variable_queue:dropwhile( - fun(#message_properties { expiry = Expiry }) -> - Expiry =< 5 - end, false, VQ1), + {_, undefined, VQ2} = rabbit_variable_queue:dropwhile( + fun(#message_properties { expiry = Expiry }) -> + Expiry =< 5 + end, false, VQ1), %% fetch five now VQ3 = lists:foldl(fun (_N, VQN) -> @@ -2475,11 +2475,11 @@ test_dropwhile(VQ0) -> test_dropwhile_varying_ram_duration(VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1), - {undefined, VQ3} = rabbit_variable_queue:dropwhile( - fun(_) -> false end, false, VQ2), + {_, undefined, VQ3} = rabbit_variable_queue:dropwhile( + fun(_) -> false end, false, VQ2), VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), VQ5 = variable_queue_publish(false, 1, VQ4), - {undefined, VQ6} = + {_, undefined, VQ6} = rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5), VQ6. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 49213c95..bd606dfb 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -589,12 +589,12 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []). dropwhile(Pred, AckRequired, State, Msgs) -> - End = fun(S) when AckRequired -> {lists:reverse(Msgs), S}; - (S) -> {undefined, S} + End = fun(Next, S) when AckRequired -> {Next, lists:reverse(Msgs), S}; + (Next, S) -> {Next, undefined, S} end, case queue_out(State) of {empty, State1} -> - End(a(State1)); + End(undefined, a(State1)); {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> case {Pred(MsgProps), AckRequired} of {true, true} -> @@ -606,7 +606,7 @@ dropwhile(Pred, AckRequired, State, Msgs) -> {_, State2} = internal_fetch(false, MsgStatus, State1), dropwhile(Pred, AckRequired, State2, undefined); {false, _} -> - End(a(in_r(MsgStatus, State1))) + End(MsgProps, a(in_r(MsgStatus, State1))) end end. |